From 156f53235fd00d8c77182ef155238414c1103028 Mon Sep 17 00:00:00 2001 From: Mytherin Date: Thu, 3 Jul 2025 16:44:43 +0200 Subject: [PATCH 1/3] Refactor S3 Multi part upload to separate class --- CMakeLists.txt | 3 +- .../httpfs/include/s3_multi_part_upload.hpp | 84 ++++++ extension/httpfs/include/s3fs.hpp | 73 +---- extension/httpfs/s3_multi_part_upload.cpp | 238 ++++++++++++++++ extension/httpfs/s3fs.cpp | 256 ++---------------- 5 files changed, 344 insertions(+), 310 deletions(-) create mode 100644 extension/httpfs/include/s3_multi_part_upload.hpp create mode 100644 extension/httpfs/s3_multi_part_upload.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 024cb72d..2ea85269 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,9 +24,10 @@ build_static_extension( extension/httpfs/http_state.cpp extension/httpfs/crypto.cpp extension/httpfs/hash_functions.cpp + extension/httpfs/s3_multi_part_upload.cpp extension/httpfs/create_secret_functions.cpp extension/httpfs/httpfs_extension.cpp - ${EXTRA_SOURCES} ) + ${EXTRA_SOURCES}) set(PARAMETERS "-warnings") build_loadable_extension( diff --git a/extension/httpfs/include/s3_multi_part_upload.hpp b/extension/httpfs/include/s3_multi_part_upload.hpp new file mode 100644 index 00000000..c1137710 --- /dev/null +++ b/extension/httpfs/include/s3_multi_part_upload.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include "s3fs.hpp" +#include "duckdb/execution/task_error_manager.hpp" + +namespace duckdb { + +// Holds the buffered data for 1 part of an S3 Multipart upload +class S3WriteBuffer { +public: + explicit S3WriteBuffer(idx_t buffer_start, size_t buffer_size, BufferHandle buffer_p) + : idx(0), buffer_start(buffer_start), buffer(std::move(buffer_p)) { + buffer_end = buffer_start + buffer_size; + part_no = buffer_start / buffer_size; + uploading = false; + } + + void *Ptr() { + return buffer.Ptr(); + } + + // The S3 multipart part number. Note that internally we start at 0 but AWS S3 starts at 1 + idx_t part_no; + + idx_t idx; + idx_t buffer_start; + idx_t buffer_end; + BufferHandle buffer; + atomic uploading; +}; + +class S3MultiPartUpload : public enable_shared_from_this { +public: + S3MultiPartUpload(S3FileSystem &s3fs_p, S3FileHandle &file_handle_p); + +public: + static shared_ptr Initialize(S3FileHandle &file_handle); + + idx_t GetPartSize() const { + return part_size; + } + + shared_ptr GetBuffer(idx_t write_buffer_idx); + static void FlushBuffer(shared_ptr upload_state, shared_ptr write_buffer); + static void UploadBuffer(shared_ptr upload_state, shared_ptr write_buffer); + void Finalize(); + +private: + string InitializeMultipartUpload(); + void NotifyUploadsInProgress(); + void FlushAllBuffers(); + void FinalizeMultipartUpload(); + +private: + S3FileSystem &s3fs; + S3FileHandle &file_handle; + string path; + S3ConfigParams config_params; + string multipart_upload_id; + idx_t part_size; + + //! Write buffers for this file + mutex write_buffers_lock; + unordered_map> write_buffers; + + //! Synchronization for upload threads + mutex uploads_in_progress_lock; + std::condition_variable uploads_in_progress_cv; + std::condition_variable final_flush_cv; + uint16_t uploads_in_progress; + + //! Etags are stored for each part + mutex part_etags_lock; + unordered_map part_etags; + + //! Info for upload + atomic parts_uploaded; + atomic upload_finalized; + + //! Error handling in upload threads + TaskErrorManager error_manager; +}; + +} // namespace duckdb diff --git a/extension/httpfs/include/s3fs.hpp b/extension/httpfs/include/s3fs.hpp index 0a10b192..801338c3 100644 --- a/extension/httpfs/include/s3fs.hpp +++ b/extension/httpfs/include/s3fs.hpp @@ -80,29 +80,7 @@ struct S3ConfigParams { class S3FileSystem; -// Holds the buffered data for 1 part of an S3 Multipart upload -class S3WriteBuffer { -public: - explicit S3WriteBuffer(idx_t buffer_start, size_t buffer_size, BufferHandle buffer_p) - : idx(0), buffer_start(buffer_start), buffer(std::move(buffer_p)) { - buffer_end = buffer_start + buffer_size; - part_no = buffer_start / buffer_size; - uploading = false; - } - - void *Ptr() { - return buffer.Ptr(); - } - - // The S3 multipart part number. Note that internally we start at 0 but AWS S3 starts at 1 - idx_t part_no; - - idx_t idx; - idx_t buffer_start; - idx_t buffer_end; - BufferHandle buffer; - atomic uploading; -}; +class S3MultiPartUpload; class S3FileHandle : public HTTPFileHandle { friend class S3FileSystem; @@ -111,8 +89,7 @@ class S3FileHandle : public HTTPFileHandle { S3FileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr http_params_p, const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p) : HTTPFileHandle(fs, file, flags, std::move(http_params_p)), auth_params(auth_params_p), - config_params(config_params_p), uploads_in_progress(0), parts_uploaded(0), upload_finalized(false), - uploader_has_error(false), upload_exception(nullptr) { + config_params(config_params_p) { if (flags.OpenForReading() && flags.OpenForWriting()) { throw NotImplementedException("Cannot open an HTTP file for both reading and writing"); } else if (flags.OpenForAppending()) { @@ -123,47 +100,16 @@ class S3FileHandle : public HTTPFileHandle { S3AuthParams auth_params; const S3ConfigParams config_params; + shared_ptr multi_part_upload; public: void Close() override; void Initialize(optional_ptr opener) override; - shared_ptr GetBuffer(uint16_t write_buffer_idx); + void FinalizeUpload(); protected: - string multipart_upload_id; - size_t part_size; - - //! Write buffers for this file - mutex write_buffers_lock; - unordered_map> write_buffers; - - //! Synchronization for upload threads - mutex uploads_in_progress_lock; - std::condition_variable uploads_in_progress_cv; - std::condition_variable final_flush_cv; - uint16_t uploads_in_progress; - - //! Etags are stored for each part - mutex part_etags_lock; - unordered_map part_etags; - - //! Info for upload - atomic parts_uploaded; - bool upload_finalized = true; - - //! Error handling in upload threads - atomic uploader_has_error {false}; - std::exception_ptr upload_exception; - unique_ptr CreateClient() override; - - //! Rethrow IO Exception originating from an upload thread - void RethrowIOError() { - if (uploader_has_error) { - std::rethrow_exception(upload_exception); - } - } }; class S3FileSystem : public HTTPFileSystem { @@ -196,21 +142,12 @@ class S3FileSystem : public HTTPFileSystem { void FileSync(FileHandle &handle) override; void Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) override; - string InitializeMultipartUpload(S3FileHandle &file_handle); - void FinalizeMultipartUpload(S3FileHandle &file_handle); - - void FlushAllBuffers(S3FileHandle &handle); - void ReadQueryParams(const string &url_query_param, S3AuthParams ¶ms); static ParsedS3Url S3UrlParse(string url, S3AuthParams ¶ms); static string UrlEncode(const string &input, bool encode_slash = false); static string UrlDecode(string input); - // Uploads the contents of write_buffer to S3. - // Note: caller is responsible to not call this method twice on the same buffer - static void UploadBuffer(S3FileHandle &file_handle, shared_ptr write_buffer); - vector Glob(const string &glob_pattern, FileOpener *opener = nullptr) override; bool ListFiles(const string &directory, const std::function &callback, FileOpener *opener = nullptr) override; @@ -228,11 +165,9 @@ class S3FileSystem : public HTTPFileSystem { static HTTPException GetS3Error(S3AuthParams &s3_auth_params, const HTTPResponse &response, const string &url); protected: - static void NotifyUploadsInProgress(S3FileHandle &file_handle); duckdb::unique_ptr CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) override; - void FlushBuffer(S3FileHandle &handle, shared_ptr write_buffer); string GetPayloadHash(char *buffer, idx_t buffer_len); HTTPException GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url) override; diff --git a/extension/httpfs/s3_multi_part_upload.cpp b/extension/httpfs/s3_multi_part_upload.cpp new file mode 100644 index 00000000..8ae903ab --- /dev/null +++ b/extension/httpfs/s3_multi_part_upload.cpp @@ -0,0 +1,238 @@ +#include "s3_multi_part_upload.hpp" +#include "duckdb/common/thread.hpp" + +namespace duckdb { + +S3MultiPartUpload::S3MultiPartUpload(S3FileSystem &s3fs_p, S3FileHandle &file_handle_p) + : s3fs(s3fs_p), file_handle(file_handle_p), path(file_handle.path), config_params(file_handle.config_params), + uploads_in_progress(0), parts_uploaded(0), upload_finalized(false) { +} + +shared_ptr S3MultiPartUpload::Initialize(S3FileHandle &file_handle) { + auto &config_params = file_handle.config_params; + + auto aws_minimum_part_size = 5242880; // 5 MiB https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + auto max_part_count = config_params.max_parts_per_file; + auto required_part_size = config_params.max_file_size / max_part_count; + auto minimum_part_size = MaxValue(aws_minimum_part_size, required_part_size); + + auto &s3fs = file_handle.file_system.Cast(); + + auto upload_state = make_shared_ptr(s3fs, file_handle); + // Round part size up to multiple of Storage::DEFAULT_BLOCK_SIZE + upload_state->part_size = ((minimum_part_size + Storage::DEFAULT_BLOCK_SIZE - 1) / Storage::DEFAULT_BLOCK_SIZE) * + Storage::DEFAULT_BLOCK_SIZE; + D_ASSERT(upload_state->part_size * max_part_count >= config_params.max_file_size); + + upload_state->multipart_upload_id = upload_state->InitializeMultipartUpload(); + return upload_state; +} + +// Opens the multipart upload and returns the ID +string S3MultiPartUpload::InitializeMultipartUpload() { + string result; + string query_param = "uploads="; + auto res = s3fs.PostRequest(file_handle, path, {}, result, nullptr, 0, query_param); + + if (res->status != HTTPStatusCode::OK_200) { + throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), + static_cast(res->status)); + } + + auto open_tag_pos = result.find("", 0); + auto close_tag_pos = result.find("", open_tag_pos); + + if (open_tag_pos == string::npos || close_tag_pos == string::npos) { + throw HTTPException("Unexpected response while initializing S3 multipart upload"); + } + + open_tag_pos += 10; // Skip open tag + + return result.substr(open_tag_pos, close_tag_pos - open_tag_pos); +} + +shared_ptr S3MultiPartUpload::GetBuffer(idx_t write_buffer_idx) { + // Check if write buffer already exists + { + lock_guard lck(write_buffers_lock); + auto lookup_result = write_buffers.find(write_buffer_idx); + if (lookup_result != write_buffers.end()) { + return lookup_result->second; + } + } + + auto buffer_handle = s3fs.Allocate(part_size, config_params.max_upload_threads); + auto new_write_buffer = + make_shared_ptr(write_buffer_idx * part_size, part_size, std::move(buffer_handle)); + { + lock_guard lck(write_buffers_lock); + auto lookup_result = write_buffers.find(write_buffer_idx); + + // Check if other thread has created the same buffer, if so we return theirs and drop ours. + if (lookup_result != write_buffers.end()) { + // write_buffer_idx << std::endl; + return lookup_result->second; + } + write_buffers.emplace(write_buffer_idx, new_write_buffer); + } + + return new_write_buffer; +} + +void S3MultiPartUpload::NotifyUploadsInProgress() { + { + unique_lock lck(uploads_in_progress_lock); + uploads_in_progress--; + } + // Note that there are 2 cv's because otherwise we might deadlock when the final flushing thread is notified while + // another thread is still waiting for an upload thread + uploads_in_progress_cv.notify_one(); + final_flush_cv.notify_one(); +} + +void S3MultiPartUpload::UploadBuffer(shared_ptr upload_state, + shared_ptr write_buffer) { + auto &s3fs = upload_state->s3fs; + + string query_param = "partNumber=" + to_string(write_buffer->part_no + 1) + "&" + + "uploadId=" + S3FileSystem::UrlEncode(upload_state->multipart_upload_id, true); + unique_ptr res; + string etag; + + try { + res = s3fs.PutRequest(upload_state->file_handle, upload_state->path, {}, (char *)write_buffer->Ptr(), + write_buffer->idx, query_param); + + if (res->status != HTTPStatusCode::OK_200) { + throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), + static_cast(res->status)); + } + + if (!res->headers.HasHeader("ETag")) { + throw IOException("Unexpected response when uploading part to S3"); + } + etag = res->headers.GetHeaderValue("ETag"); + } catch (std::exception &ex) { + ErrorData error(ex); + if (error.Type() != ExceptionType::IO && error.Type() != ExceptionType::HTTP) { + throw; + } + upload_state->error_manager.PushError(std::move(error)); + + upload_state->NotifyUploadsInProgress(); + return; + } + + // Insert etag + { + unique_lock lck(upload_state->part_etags_lock); + upload_state->part_etags.insert(std::pair(write_buffer->part_no, etag)); + } + + upload_state->parts_uploaded++; + + // Free up space for another thread to acquire an S3WriteBuffer + write_buffer.reset(); + + upload_state->NotifyUploadsInProgress(); +} + +void S3MultiPartUpload::FlushBuffer(shared_ptr upload_state, + shared_ptr write_buffer) { + auto uploading = write_buffer->uploading.load(); + if (uploading) { + return; + } + bool can_upload = write_buffer->uploading.compare_exchange_strong(uploading, true); + if (!can_upload) { + return; + } + + if (upload_state->error_manager.HasError()) { + upload_state->error_manager.ThrowException(); + } + + { + unique_lock lck(upload_state->write_buffers_lock); + upload_state->write_buffers.erase(write_buffer->part_no); + } + + { + unique_lock lck(upload_state->uploads_in_progress_lock); + // check if there are upload threads available + if (upload_state->uploads_in_progress >= upload_state->config_params.max_upload_threads) { + // there are not - wait for one to become available + upload_state->uploads_in_progress_cv.wait(lck, [&] { + return upload_state->uploads_in_progress < upload_state->config_params.max_upload_threads; + }); + } + upload_state->uploads_in_progress++; + } + + thread upload_thread(UploadBuffer, upload_state, write_buffer); + upload_thread.detach(); +} + +// Note that FlushAll currently does not allow to continue writing afterwards. Therefore, FinalizeMultipartUpload should +// be called right after it! +// TODO: we can fix this by keeping the last partially written buffer in memory and allow reuploading it with new data. +void S3MultiPartUpload::FlushAllBuffers() { + // Collect references to all buffers to check + vector> to_flush; + write_buffers_lock.lock(); + for (auto &item : write_buffers) { + to_flush.push_back(item.second); + } + write_buffers_lock.unlock(); + + // Flush all buffers that aren't already uploading + for (auto &write_buffer : to_flush) { + if (!write_buffer->uploading) { + FlushBuffer(shared_from_this(), write_buffer); + } + } + unique_lock lck(uploads_in_progress_lock); + final_flush_cv.wait(lck, [&] { return uploads_in_progress == 0; }); + + if (error_manager.HasError()) { + error_manager.ThrowException(); + } +} + +void S3MultiPartUpload::FinalizeMultipartUpload() { + upload_finalized = true; + + std::stringstream ss; + ss << ""; + + auto parts = parts_uploaded.load(); + for (auto i = 0; i < parts; i++) { + auto etag_lookup = part_etags.find(i); + if (etag_lookup == part_etags.end()) { + throw IOException("Unknown part number"); + } + ss << "" << etag_lookup->second << "" << i + 1 << ""; + } + ss << ""; + string body = ss.str(); + + string result; + + string query_param = "uploadId=" + S3FileSystem::UrlEncode(multipart_upload_id, true); + auto res = + s3fs.PostRequest(file_handle, file_handle.path, {}, result, (char *)body.c_str(), body.length(), query_param); + auto open_tag_pos = result.find("(res->status), result); + } +} + +void S3MultiPartUpload::Finalize() { + FlushAllBuffers(); + if (parts_uploaded) { + FinalizeMultipartUpload(); + } +} + +} // namespace duckdb diff --git a/extension/httpfs/s3fs.cpp b/extension/httpfs/s3fs.cpp index be07524d..f1d22217 100644 --- a/extension/httpfs/s3fs.cpp +++ b/extension/httpfs/s3fs.cpp @@ -17,6 +17,7 @@ #include "duckdb/function/scalar/string_common.hpp" #include "duckdb/main/secret/secret_manager.hpp" #include "duckdb/storage/buffer_manager.hpp" +#include "s3_multi_part_upload.hpp" #include "create_secret_functions.hpp" @@ -264,12 +265,13 @@ S3ConfigParams S3ConfigParams::ReadFrom(optional_ptr opener) { } void S3FileHandle::Close() { - auto &s3fs = (S3FileSystem &)file_system; - if (flags.OpenForWriting() && !upload_finalized) { - s3fs.FlushAllBuffers(*this); - if (parts_uploaded) { - s3fs.FinalizeMultipartUpload(*this); - } + FinalizeUpload(); +} + +void S3FileHandle::FinalizeUpload() { + if (flags.OpenForWriting() && multi_part_upload) { + // wait for any multi-part uploads to finish + multi_part_upload->Finalize(); } } @@ -280,223 +282,11 @@ unique_ptr S3FileHandle::CreateClient() { return http_params.http_util.InitializeClient(http_params, proto_host_port); } -// Opens the multipart upload and returns the ID -string S3FileSystem::InitializeMultipartUpload(S3FileHandle &file_handle) { - auto &s3fs = (S3FileSystem &)file_handle.file_system; - - // AWS response is around 300~ chars in docs so this should be enough to not need a resize - string result; - string query_param = "uploads="; - auto res = s3fs.PostRequest(file_handle, file_handle.path, {}, result, nullptr, 0, query_param); - - if (res->status != HTTPStatusCode::OK_200) { - throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), - static_cast(res->status)); - } - - auto open_tag_pos = result.find("", 0); - auto close_tag_pos = result.find("", open_tag_pos); - - if (open_tag_pos == string::npos || close_tag_pos == string::npos) { - throw HTTPException("Unexpected response while initializing S3 multipart upload"); - } - - open_tag_pos += 10; // Skip open tag - - return result.substr(open_tag_pos, close_tag_pos - open_tag_pos); -} - -void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) { - { - unique_lock lck(file_handle.uploads_in_progress_lock); - file_handle.uploads_in_progress--; - } - // Note that there are 2 cv's because otherwise we might deadlock when the final flushing thread is notified while - // another thread is still waiting for an upload thread - file_handle.uploads_in_progress_cv.notify_one(); - file_handle.final_flush_cv.notify_one(); -} - -void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr write_buffer) { - auto &s3fs = (S3FileSystem &)file_handle.file_system; - - string query_param = "partNumber=" + to_string(write_buffer->part_no + 1) + "&" + - "uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true); - unique_ptr res; - string etag; - - try { - res = s3fs.PutRequest(file_handle, file_handle.path, {}, (char *)write_buffer->Ptr(), write_buffer->idx, - query_param); - - if (res->status != HTTPStatusCode::OK_200) { - throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), - static_cast(res->status)); - } - - if (!res->headers.HasHeader("ETag")) { - throw IOException("Unexpected response when uploading part to S3"); - } - etag = res->headers.GetHeaderValue("ETag"); - } catch (std::exception &ex) { - ErrorData error(ex); - if (error.Type() != ExceptionType::IO && error.Type() != ExceptionType::HTTP) { - throw; - } - // Ensure only one thread sets the exception - bool f = false; - auto exchanged = file_handle.uploader_has_error.compare_exchange_strong(f, true); - if (exchanged) { - file_handle.upload_exception = std::current_exception(); - } - - NotifyUploadsInProgress(file_handle); - return; - } - - // Insert etag - { - unique_lock lck(file_handle.part_etags_lock); - file_handle.part_etags.insert(std::pair(write_buffer->part_no, etag)); - } - - file_handle.parts_uploaded++; - - // Free up space for another thread to acquire an S3WriteBuffer - write_buffer.reset(); - - NotifyUploadsInProgress(file_handle); -} - -void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr write_buffer) { - if (write_buffer->idx == 0) { - return; - } - - auto uploading = write_buffer->uploading.load(); - if (uploading) { - return; - } - bool can_upload = write_buffer->uploading.compare_exchange_strong(uploading, true); - if (!can_upload) { - return; - } - - file_handle.RethrowIOError(); - - { - unique_lock lck(file_handle.write_buffers_lock); - file_handle.write_buffers.erase(write_buffer->part_no); - } - - { - unique_lock lck(file_handle.uploads_in_progress_lock); - // check if there are upload threads available - if (file_handle.uploads_in_progress >= file_handle.config_params.max_upload_threads) { - // there are not - wait for one to become available - file_handle.uploads_in_progress_cv.wait(lck, [&file_handle] { - return file_handle.uploads_in_progress < file_handle.config_params.max_upload_threads; - }); - } - file_handle.uploads_in_progress++; - } - - thread upload_thread(UploadBuffer, std::ref(file_handle), write_buffer); - upload_thread.detach(); -} - -// Note that FlushAll currently does not allow to continue writing afterwards. Therefore, FinalizeMultipartUpload should -// be called right after it! -// TODO: we can fix this by keeping the last partially written buffer in memory and allow reuploading it with new data. -void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) { - // Collect references to all buffers to check - vector> to_flush; - file_handle.write_buffers_lock.lock(); - for (auto &item : file_handle.write_buffers) { - to_flush.push_back(item.second); - } - file_handle.write_buffers_lock.unlock(); - - // Flush all buffers that aren't already uploading - for (auto &write_buffer : to_flush) { - if (!write_buffer->uploading) { - FlushBuffer(file_handle, write_buffer); - } - } - unique_lock lck(file_handle.uploads_in_progress_lock); - file_handle.final_flush_cv.wait(lck, [&file_handle] { return file_handle.uploads_in_progress == 0; }); - - file_handle.RethrowIOError(); -} - -void S3FileSystem::FinalizeMultipartUpload(S3FileHandle &file_handle) { - auto &s3fs = (S3FileSystem &)file_handle.file_system; - file_handle.upload_finalized = true; - - std::stringstream ss; - ss << ""; - - auto parts = file_handle.parts_uploaded.load(); - for (auto i = 0; i < parts; i++) { - auto etag_lookup = file_handle.part_etags.find(i); - if (etag_lookup == file_handle.part_etags.end()) { - throw IOException("Unknown part number"); - } - ss << "" << etag_lookup->second << "" << i + 1 << ""; - } - ss << ""; - string body = ss.str(); - - // Response is around ~400 in AWS docs so this should be enough to not need a resize - string result; - - string query_param = "uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true); - auto res = - s3fs.PostRequest(file_handle, file_handle.path, {}, result, (char *)body.c_str(), body.length(), query_param); - auto open_tag_pos = result.find("(res->status), result); - } -} - // Wrapper around the BufferManager::Allocate to that allows limiting the number of buffers that will be handed out BufferHandle S3FileSystem::Allocate(idx_t part_size, uint16_t max_threads) { return buffer_manager.Allocate(MemoryTag::EXTENSION, part_size); } -shared_ptr S3FileHandle::GetBuffer(uint16_t write_buffer_idx) { - auto &s3fs = (S3FileSystem &)file_system; - - // Check if write buffer already exists - { - unique_lock lck(write_buffers_lock); - auto lookup_result = write_buffers.find(write_buffer_idx); - if (lookup_result != write_buffers.end()) { - shared_ptr buffer = lookup_result->second; - return buffer; - } - } - - auto buffer_handle = s3fs.Allocate(part_size, config_params.max_upload_threads); - auto new_write_buffer = - make_shared_ptr(write_buffer_idx * part_size, part_size, std::move(buffer_handle)); - { - unique_lock lck(write_buffers_lock); - auto lookup_result = write_buffers.find(write_buffer_idx); - - // Check if other thread has created the same buffer, if so we return theirs and drop ours. - if (lookup_result != write_buffers.end()) { - // write_buffer_idx << std::endl; - shared_ptr write_buffer = lookup_result->second; - return write_buffer; - } - write_buffers.insert(pair>(write_buffer_idx, new_write_buffer)); - } - - return new_write_buffer; -} - void GetQueryParam(const string &key, string ¶m, unordered_map &query_params) { auto found_param = query_params.find(key); if (found_param != query_params.end()) { @@ -756,20 +546,8 @@ void S3FileHandle::Initialize(optional_ptr opener) { HTTPFileHandle::Initialize(opener); } - auto &s3fs = file_system.Cast(); - if (flags.OpenForWriting()) { - auto aws_minimum_part_size = 5242880; // 5 MiB https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - auto max_part_count = config_params.max_parts_per_file; - auto required_part_size = config_params.max_file_size / max_part_count; - auto minimum_part_size = MaxValue(aws_minimum_part_size, required_part_size); - - // Round part size up to multiple of Storage::DEFAULT_BLOCK_SIZE - part_size = ((minimum_part_size + Storage::DEFAULT_BLOCK_SIZE - 1) / Storage::DEFAULT_BLOCK_SIZE) * - Storage::DEFAULT_BLOCK_SIZE; - D_ASSERT(part_size * max_part_count >= config_params.max_file_size); - - multipart_upload_id = s3fs.InitializeMultipartUpload(*this); + multi_part_upload = S3MultiPartUpload::Initialize(*this); } } @@ -813,10 +591,7 @@ void S3FileSystem::RemoveDirectory(const string &path, optional_ptr void S3FileSystem::FileSync(FileHandle &handle) { auto &s3fh = handle.Cast(); - if (!s3fh.upload_finalized) { - FlushAllBuffers(s3fh); - FinalizeMultipartUpload(s3fh); - } + s3fh.FinalizeUpload(); } void S3FileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) { @@ -834,20 +609,21 @@ void S3FileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx } // Find buffer for writing - auto write_buffer_idx = curr_location / s3fh.part_size; + auto part_size = s3fh.multi_part_upload->GetPartSize(); + auto write_buffer_idx = curr_location / part_size; // Get write buffer, may block until buffer is available - auto write_buffer = s3fh.GetBuffer(write_buffer_idx); + auto write_buffer = s3fh.multi_part_upload->GetBuffer(write_buffer_idx); // Writing to buffer auto idx_to_write = curr_location - write_buffer->buffer_start; - auto bytes_to_write = MinValue(nr_bytes - bytes_written, s3fh.part_size - idx_to_write); + auto bytes_to_write = MinValue(nr_bytes - bytes_written, part_size - idx_to_write); memcpy((char *)write_buffer->Ptr() + idx_to_write, (char *)buffer + bytes_written, bytes_to_write); write_buffer->idx += bytes_to_write; // Flush to HTTP if full - if (write_buffer->idx >= s3fh.part_size) { - FlushBuffer(s3fh, write_buffer); + if (write_buffer->idx >= part_size) { + S3MultiPartUpload::FlushBuffer(s3fh.multi_part_upload, write_buffer); } s3fh.file_offset += bytes_to_write; bytes_written += bytes_to_write; From ca5b1bc723d727eac85f3ec84726dbe8fe4eb5b6 Mon Sep 17 00:00:00 2001 From: Mytherin Date: Thu, 3 Jul 2025 17:21:34 +0200 Subject: [PATCH 2/3] Detach HTTPInput from file handle so S3MultiPartUpload can live independently of the file handle --- extension/httpfs/httpfs.cpp | 29 +++++++------ extension/httpfs/include/httpfs.hpp | 27 ++++++++++-- .../httpfs/include/s3_multi_part_upload.hpp | 2 +- extension/httpfs/include/s3fs.hpp | 25 +++++------ extension/httpfs/s3_multi_part_upload.cpp | 8 ++-- extension/httpfs/s3fs.cpp | 41 +++++++++++++------ 6 files changed, 88 insertions(+), 44 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 20f85a69..d410526c 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -96,23 +96,21 @@ void HTTPClientCache::StoreClient(unique_ptr client) { clients.push_back(std::move(client)); } -unique_ptr HTTPFileSystem::PostRequest(FileHandle &handle, string url, HTTPHeaders header_map, +unique_ptr HTTPFileSystem::PostRequest(HTTPInput &input, string url, HTTPHeaders header_map, string &buffer_out, char *buffer_in, idx_t buffer_in_len, string params) { - auto &hfh = handle.Cast(); - auto &http_util = hfh.http_params.http_util; - PostRequestInfo post_request(url, header_map, hfh.http_params, const_data_ptr_cast(buffer_in), buffer_in_len); + auto &http_util = input.http_params.http_util; + PostRequestInfo post_request(url, header_map, input.http_params, const_data_ptr_cast(buffer_in), buffer_in_len); auto result = http_util.Request(post_request); buffer_out = std::move(post_request.buffer_out); return result; } -unique_ptr HTTPFileSystem::PutRequest(FileHandle &handle, string url, HTTPHeaders header_map, +unique_ptr HTTPFileSystem::PutRequest(HTTPInput &input, string url, HTTPHeaders header_map, char *buffer_in, idx_t buffer_in_len, string params) { - auto &hfh = handle.Cast(); - auto &http_util = hfh.http_params.http_util; + auto &http_util = input.http_params.http_util; string content_type = "application/octet-stream"; - PutRequestInfo put_request(url, header_map, hfh.http_params, (const_data_ptr_t)buffer_in, buffer_in_len, + PutRequestInfo put_request(url, header_map, input.http_params, (const_data_ptr_t)buffer_in, buffer_in_len, content_type); return http_util.Request(put_request); } @@ -272,10 +270,11 @@ void TimestampToTimeT(timestamp_t timestamp, time_t &result) { result = mktime(&tm); } -HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, - unique_ptr params_p) - : FileHandle(fs, file.path, flags), params(std::move(params_p)), http_params(params->Cast()), - flags(flags), length(0), force_full_download(false), buffer_available(0), buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0) { +HTTPInput::HTTPInput(unique_ptr params_p) : params(std::move(params_p)), http_params(params->Cast()) {} + +HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, shared_ptr input_p) + : FileHandle(fs, file.path, flags), http_input(std::move(input_p)), http_params(http_input->http_params), + flags(flags), length(0), force_full_download(false), buffer_available(0), buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0) { // check if the handle has extended properties that can be set directly in the handle // if we have these properties we don't need to do a head request to obtain them later if (file.extended_info) { @@ -303,6 +302,12 @@ HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpe } } } + +HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, + unique_ptr params_p) : + HTTPFileHandle(fs, file, flags, make_shared_ptr(std::move(params_p))) { +} + unique_ptr HTTPFileSystem::CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) { D_ASSERT(flags.Compression() == FileCompressionType::UNCOMPRESSED); diff --git a/extension/httpfs/include/httpfs.hpp b/extension/httpfs/include/httpfs.hpp index 62067d46..83a8b71a 100644 --- a/extension/httpfs/include/httpfs.hpp +++ b/extension/httpfs/include/httpfs.hpp @@ -28,11 +28,32 @@ class HTTPClientCache { mutex lock; }; +class HTTPInput { +public: + HTTPInput(unique_ptr params_p); + virtual ~HTTPInput() = default; + + unique_ptr params; + HTTPFSParams &http_params; + + template + TARGET &Cast() { + DynamicCastCheck(this); + return reinterpret_cast(*this); + } + template + const TARGET &Cast() const { + DynamicCastCheck(this); + return reinterpret_cast(*this); + } +}; + class HTTPFileSystem; class HTTPFileHandle : public FileHandle { public: HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr params); + HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, shared_ptr input); ~HTTPFileHandle() override; // This two-phase construction allows subclasses more flexible setup. virtual void Initialize(optional_ptr opener); @@ -40,7 +61,7 @@ class HTTPFileHandle : public FileHandle { // We keep an http client stored for connection reuse with keep-alive headers HTTPClientCache client_cache; - unique_ptr params; + shared_ptr http_input; HTTPFSParams &http_params; // File handle info @@ -106,10 +127,10 @@ class HTTPFileSystem : public FileSystem { // Get Request without a range (i.e., downloads full file) virtual duckdb::unique_ptr GetRequest(FileHandle &handle, string url, HTTPHeaders header_map); // Post Request that can handle variable sized responses without a content-length header (needed for s3 multipart) - virtual duckdb::unique_ptr PostRequest(FileHandle &handle, string url, HTTPHeaders header_map, + virtual duckdb::unique_ptr PostRequest(HTTPInput &input, string url, HTTPHeaders header_map, string &result, char *buffer_in, idx_t buffer_in_len, string params = ""); - virtual duckdb::unique_ptr PutRequest(FileHandle &handle, string url, HTTPHeaders header_map, + virtual duckdb::unique_ptr PutRequest(HTTPInput &input, string url, HTTPHeaders header_map, char *buffer_in, idx_t buffer_in_len, string params = ""); virtual duckdb::unique_ptr DeleteRequest(FileHandle &handle, string url, HTTPHeaders header_map); diff --git a/extension/httpfs/include/s3_multi_part_upload.hpp b/extension/httpfs/include/s3_multi_part_upload.hpp index c1137710..e8d85470 100644 --- a/extension/httpfs/include/s3_multi_part_upload.hpp +++ b/extension/httpfs/include/s3_multi_part_upload.hpp @@ -53,7 +53,7 @@ class S3MultiPartUpload : public enable_shared_from_this { private: S3FileSystem &s3fs; - S3FileHandle &file_handle; + shared_ptr http_input; string path; S3ConfigParams config_params; string multipart_upload_id; diff --git a/extension/httpfs/include/s3fs.hpp b/extension/httpfs/include/s3fs.hpp index 801338c3..42a60846 100644 --- a/extension/httpfs/include/s3fs.hpp +++ b/extension/httpfs/include/s3fs.hpp @@ -78,6 +78,15 @@ struct S3ConfigParams { static S3ConfigParams ReadFrom(optional_ptr opener); }; +class S3HTTPInput : public HTTPInput { +public: + S3HTTPInput(unique_ptr params, const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p); + ~S3HTTPInput() override; + + S3AuthParams auth_params; + S3ConfigParams config_params; +}; + class S3FileSystem; class S3MultiPartUpload; @@ -87,18 +96,10 @@ class S3FileHandle : public HTTPFileHandle { public: S3FileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr http_params_p, - const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p) - : HTTPFileHandle(fs, file, flags, std::move(http_params_p)), auth_params(auth_params_p), - config_params(config_params_p) { - if (flags.OpenForReading() && flags.OpenForWriting()) { - throw NotImplementedException("Cannot open an HTTP file for both reading and writing"); - } else if (flags.OpenForAppending()) { - throw NotImplementedException("Cannot open an HTTP file for appending"); - } - } + const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p); ~S3FileHandle() override; - S3AuthParams auth_params; + S3AuthParams &auth_params; const S3ConfigParams config_params; shared_ptr multi_part_upload; @@ -126,10 +127,10 @@ class S3FileSystem : public HTTPFileSystem { duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, idx_t file_offset, char *buffer_out, idx_t buffer_out_len) override; - duckdb::unique_ptr PostRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, + duckdb::unique_ptr PostRequest(HTTPInput &input, string s3_url, HTTPHeaders header_map, string &buffer_out, char *buffer_in, idx_t buffer_in_len, string http_params = "") override; - duckdb::unique_ptr PutRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, + duckdb::unique_ptr PutRequest(HTTPInput &input, string s3_url, HTTPHeaders header_map, char *buffer_in, idx_t buffer_in_len, string http_params = "") override; duckdb::unique_ptr DeleteRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) override; diff --git a/extension/httpfs/s3_multi_part_upload.cpp b/extension/httpfs/s3_multi_part_upload.cpp index 8ae903ab..f5e7c4e0 100644 --- a/extension/httpfs/s3_multi_part_upload.cpp +++ b/extension/httpfs/s3_multi_part_upload.cpp @@ -4,7 +4,7 @@ namespace duckdb { S3MultiPartUpload::S3MultiPartUpload(S3FileSystem &s3fs_p, S3FileHandle &file_handle_p) - : s3fs(s3fs_p), file_handle(file_handle_p), path(file_handle.path), config_params(file_handle.config_params), + : s3fs(s3fs_p), http_input(file_handle_p.http_input), path(file_handle_p.path), config_params(file_handle_p.config_params), uploads_in_progress(0), parts_uploaded(0), upload_finalized(false) { } @@ -32,7 +32,7 @@ shared_ptr S3MultiPartUpload::Initialize(S3FileHandle &file_h string S3MultiPartUpload::InitializeMultipartUpload() { string result; string query_param = "uploads="; - auto res = s3fs.PostRequest(file_handle, path, {}, result, nullptr, 0, query_param); + auto res = s3fs.PostRequest(*http_input, path, {}, result, nullptr, 0, query_param); if (res->status != HTTPStatusCode::OK_200) { throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), @@ -100,7 +100,7 @@ void S3MultiPartUpload::UploadBuffer(shared_ptr upload_state, string etag; try { - res = s3fs.PutRequest(upload_state->file_handle, upload_state->path, {}, (char *)write_buffer->Ptr(), + res = s3fs.PutRequest(*upload_state->http_input, upload_state->path, {}, (char *)write_buffer->Ptr(), write_buffer->idx, query_param); if (res->status != HTTPStatusCode::OK_200) { @@ -220,7 +220,7 @@ void S3MultiPartUpload::FinalizeMultipartUpload() { string query_param = "uploadId=" + S3FileSystem::UrlEncode(multipart_upload_id, true); auto res = - s3fs.PostRequest(file_handle, file_handle.path, {}, result, (char *)body.c_str(), body.length(), query_param); + s3fs.PostRequest(*http_input, path, {}, result, (char *)body.c_str(), body.length(), query_param); auto open_tag_pos = result.find(" CreateSecret(vector &prefix_paths_p, string & return return_value; } +S3HTTPInput::S3HTTPInput(unique_ptr params_p, const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p) : + HTTPInput(std::move(params_p)), auth_params(auth_params_p), config_params(config_params_p) {} + +S3HTTPInput::~S3HTTPInput() { +} + +S3FileHandle::S3FileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr http_params_p, + const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p) + : HTTPFileHandle(fs, file, flags, make_shared_ptr(std::move(http_params_p), auth_params_p, config_params_p)), auth_params(http_input->Cast().auth_params), + config_params(http_input->Cast().config_params) { + if (flags.OpenForReading() && flags.OpenForWriting()) { + throw NotImplementedException("Cannot open an HTTP file for both reading and writing"); + } else if (flags.OpenForAppending()) { + throw NotImplementedException("Cannot open an HTTP file for appending"); + } +} + S3FileHandle::~S3FileHandle() { if (Exception::UncaughtException()) { // We are in an exception, don't do anything @@ -426,30 +443,30 @@ string ParsedS3Url::GetHTTPUrl(S3AuthParams &auth_params, const string &http_que return full_url; } -unique_ptr S3FileSystem::PostRequest(FileHandle &handle, string url, HTTPHeaders header_map, +unique_ptr S3FileSystem::PostRequest(HTTPInput &input, string url, HTTPHeaders header_map, string &result, char *buffer_in, idx_t buffer_in_len, string http_params) { - auto auth_params = handle.Cast().auth_params; - auto parsed_s3_url = S3UrlParse(url, auth_params); - string http_url = parsed_s3_url.GetHTTPUrl(auth_params, http_params); + auto &s3_input = input.Cast(); + auto parsed_s3_url = S3UrlParse(url, s3_input.auth_params); + string http_url = parsed_s3_url.GetHTTPUrl(s3_input.auth_params, http_params); auto payload_hash = GetPayloadHash(buffer_in, buffer_in_len); - auto headers = create_s3_header(parsed_s3_url.path, http_params, parsed_s3_url.host, "s3", "POST", auth_params, "", + auto headers = create_s3_header(parsed_s3_url.path, http_params, parsed_s3_url.host, "s3", "POST", s3_input.auth_params, "", "", payload_hash, "application/octet-stream"); - return HTTPFileSystem::PostRequest(handle, http_url, headers, result, buffer_in, buffer_in_len); + return HTTPFileSystem::PostRequest(input, http_url, headers, result, buffer_in, buffer_in_len); } -unique_ptr S3FileSystem::PutRequest(FileHandle &handle, string url, HTTPHeaders header_map, +unique_ptr S3FileSystem::PutRequest(HTTPInput &input, string url, HTTPHeaders header_map, char *buffer_in, idx_t buffer_in_len, string http_params) { - auto auth_params = handle.Cast().auth_params; - auto parsed_s3_url = S3UrlParse(url, auth_params); - string http_url = parsed_s3_url.GetHTTPUrl(auth_params, http_params); + auto &s3_input = input.Cast(); + auto parsed_s3_url = S3UrlParse(url, s3_input.auth_params); + string http_url = parsed_s3_url.GetHTTPUrl(s3_input.auth_params, http_params); auto content_type = "application/octet-stream"; auto payload_hash = GetPayloadHash(buffer_in, buffer_in_len); - auto headers = create_s3_header(parsed_s3_url.path, http_params, parsed_s3_url.host, "s3", "PUT", auth_params, "", + auto headers = create_s3_header(parsed_s3_url.path, http_params, parsed_s3_url.host, "s3", "PUT", s3_input.auth_params, "", "", payload_hash, content_type); - return HTTPFileSystem::PutRequest(handle, http_url, headers, buffer_in, buffer_in_len); + return HTTPFileSystem::PutRequest(input, http_url, headers, buffer_in, buffer_in_len); } unique_ptr S3FileSystem::HeadRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) { From 7d63c3d99db51056d3b4d433dd303757e31b879d Mon Sep 17 00:00:00 2001 From: Mytherin Date: Thu, 3 Jul 2025 17:26:01 +0200 Subject: [PATCH 3/3] Avoid finalizing multiple times --- extension/httpfs/s3_multi_part_upload.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extension/httpfs/s3_multi_part_upload.cpp b/extension/httpfs/s3_multi_part_upload.cpp index f5e7c4e0..d4593bdd 100644 --- a/extension/httpfs/s3_multi_part_upload.cpp +++ b/extension/httpfs/s3_multi_part_upload.cpp @@ -229,6 +229,10 @@ void S3MultiPartUpload::FinalizeMultipartUpload() { } void S3MultiPartUpload::Finalize() { + if (upload_finalized) { + // already finalized + return; + } FlushAllBuffers(); if (parts_uploaded) { FinalizeMultipartUpload();