diff --git a/mooncake-store/include/client.h b/mooncake-store/include/client.h index 2aefd76c7..138858387 100644 --- a/mooncake-store/include/client.h +++ b/mooncake-store/include/client.h @@ -247,6 +247,61 @@ class Client { std::vector> BatchIsExist( const std::vector& keys); + /** + * @brief Mounts a file storage segment into the master. + * @param segment_name Unique identifier for the storage segment to mount. + * @param local_rpc_addr Local RPC address (e.g., "ip:port") where the + * segment will listen for requests. + * @param enable_offloading If true, enables offloading (write-to-file). + */ + tl::expected MountFileStorage( + const std::string& segment_name, const std::string& local_rpc_addr, + bool enable_offloading); + + /** + * @brief Heartbeat call to collect object-level statistics and retrieve the + * set of non-offloaded objects. + * @param segment_name Name of the storage segment associated with this + * heartbeat. + * @param enable_offloading Indicates whether offloading is enabled for this + * segment. + * @param offloading_objects On return, contains a map from object key to + * size (in bytes) for all objects that require offload. + */ + tl::expected OffloadObjectHeartbeat( + const std::string& segment_name, bool enable_offloading, + std::unordered_map& offloading_objects); + + /** + * @brief Performs a batched write of multiple objects using a + * high-throughput Transfer Engine. + * @param transfer_engine_addr Address of the Transfer Engine service (e.g., + * "ip:port"). + * @param keys List of keys identifying the data objects to be transferred + * @param pointers Array of destination memory addresses on the remote node + * where data will be written (one per key) + * @param batched_slices Map from object key to its data slice + * (`mooncake::Slice`), containing raw bytes to be written. + */ + tl::expected BatchPutOffloadObject( + const std::string& transfer_engine_addr, + const std::vector& keys, + const std::vector& pointers, + const std::unordered_map& batched_slices); + + /** + * @brief Notifies the master that offloading of specified objects has + * succeeded. + * @param segment_name Name of the target storage segment to which objects + * will be added. + * @param keys List of object keys that were successfully offloaded + * @param metadatas Corresponding metadata for each key (e.g., size, + * hash, timestamp) + */ + tl::expected NotifyOffloadSuccess( + const std::string& segment_name, const std::vector& keys, + const std::vector& metadatas); + // For human-readable metrics tl::expected GetSummaryMetrics() { if (metrics_ == nullptr) { diff --git a/mooncake-store/include/file_storage.h b/mooncake-store/include/file_storage.h new file mode 100644 index 000000000..824a22104 --- /dev/null +++ b/mooncake-store/include/file_storage.h @@ -0,0 +1,182 @@ +#pragma once + +#include "client.h" +#include "client_buffer.hpp" +#include "storage_backend.h" +#include "../tests/utils/common.h" + +namespace mooncake { + +struct FileStorageConfig { + // Path where data files are stored on disk + std::string storage_filepath = "/data/file_storage"; + + // Size of the local client-side buffer (used for caching or batching) + int64_t local_buffer_size = 1280 * 1024 * 1024; // ~1.2 GB + + // Limits for scanning and iteration operations + int64_t bucket_iterator_keys_limit = + 20000; // Max number of keys returned per Scan call + int64_t bucket_keys_limit = + 500; // Max number of keys allowed in a single bucket + int64_t bucket_size_limit = + 256 * 1024 * 1024; // Max total size of a single bucket (256 MB) + + // Global limits across all buckets + int64_t total_keys_limit = 10'000'000; // Maximum total number of keys + int64_t total_size_limit = + 2ULL * 1024 * 1024 * 1024 * 1024; // Maximum total storage size (2 TB) + + // Interval between heartbeats sent to the control plane (in seconds) + uint32_t heartbeat_interval_seconds = 10; + + // Validates the configuration for correctness and consistency + bool Validate() const; + + /** + * @brief Creates a config instance by reading values from environment + * variables. + * + * Uses default values if environment variables are not set or invalid. + * This is a static factory method for easy configuration loading. + * + * @return FileStorageConfig with values from env or defaults + */ + static FileStorageConfig FromEnvironment(); + + static std::string GetEnvStringOr(const char* name, + const std::string& default_value); + + template + static T GetEnvOr(const char* name, T default_value); +}; + +class BucketIterator { + public: + BucketIterator(std::shared_ptr storage_backend, + int64_t limit); + + tl::expected HandleNext( + const std::function< + ErrorCode(const std::vector& keys, + const std::vector& metadatas, + const std::vector& buckets)>& handler); + + tl::expected HasNext(); + + private: + std::shared_ptr storage_backend_; + int64_t limit_; + mutable Mutex mutex_; + int64_t GUARDED_BY(mutex_) next_bucket_ = -1; +}; + +class FileStorage { + public: + FileStorage(std::shared_ptr client, const std::string& segment_name, + const std::string& local_rpc_addr, + const FileStorageConfig& config); + ~FileStorage(); + + tl::expected Init(); + + /** + * @brief Reads multiple key-value (KV) entries from local storage and + * forwards them to a remote node. + * @param transfer_engine_addr Address of the remote transfer engine + * (format: "ip:port") + * @param keys List of keys to read from the local KV store + * @param pointers Array of remote memory base addresses (on the + * destination node) where each corresponding value will be written + * @param sizes Expected size in bytes for each value + * @return tl::expected indicating operation status. + */ + tl::expected BatchGet( + const std::string& transfer_engine_addr, + const std::vector& keys, + const std::vector& pointers, + const std::vector& sizes); + + private: + friend class FileStorageTest; + struct AllocatedBatch { + std::vector handles; + std::unordered_map slices; + + AllocatedBatch() = default; + AllocatedBatch(AllocatedBatch&&) = default; + AllocatedBatch& operator=(AllocatedBatch&&) = default; + + AllocatedBatch(const AllocatedBatch&) = delete; + AllocatedBatch& operator=(const AllocatedBatch&) = delete; + + ~AllocatedBatch() = default; + }; + + /** + * @brief Offload object data and metadata. + * @return tl::expected indicating operation status. + */ + tl::expected OffloadObjects( + const std::unordered_map& offloading_objects); + + /** + * @brief Groups offloading keys into buckets based on size and existence + * checks. + * @param offloading_objects Input map of object keys and their sizes + * (e.g., byte size). + * @param buckets_keys Output parameter: receives a 2D vector where: + * - Each outer element represents a bucket. + * - Each inner vector contains the newly allocated + * object keys within that bucket. + * @return tl::expected indicating operation status. + */ + tl::expected GroupOffloadingKeysByBucket( + const std::unordered_map& offloading_objects, + std::vector>& buckets_keys); + + /** + * @brief Performs a heartbeat operation for the FileStorage component. + * 1. Sends object status (e.g., access frequency, size) to the master via + * client. + * 2. Receives feedback on which objects should be offloaded. + * 3. Triggers asynchronous offloading of pending objects. + * @return tl::expected indicating operation status. + */ + tl::expected Heartbeat(); + + tl::expected IsEnableOffloading(); + + tl::expected BatchOffload( + const std::vector& keys); + + tl::expected BatchLoad( + const std::unordered_map& batch_object); + + tl::expected BatchQuerySegmentSlices( + const std::vector& keys, + std::unordered_map>& batched_slices); + + tl::expected RegisterLocalMemory(); + + tl::expected AllocateBatch( + const std::vector& keys, + const std::vector& sizes); + + std::shared_ptr client_; + std::string segment_name_; + std::string local_rpc_addr_; + FileStorageConfig config_; + std::shared_ptr storage_backend_; + std::shared_ptr client_buffer_allocator_; + BucketIterator sync_stat_bucket_iterator_; + + mutable Mutex offloading_mutex_; + std::unordered_map GUARDED_BY(offloading_mutex_) + ungrouped_offloading_objects_; + bool GUARDED_BY(offloading_mutex_) enable_offloading_; + std::atomic heartbeat_running_; + std::thread heartbeat_thread_; +}; + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/include/storage_backend.h b/mooncake-store/include/storage_backend.h index b58d60b47..dcd7ae26f 100644 --- a/mooncake-store/include/storage_backend.h +++ b/mooncake-store/include/storage_backend.h @@ -1,24 +1,18 @@ #pragma once #include + +#include #include #include #include -#include -#include "mutex.h" -#include "types.h" #include "file_interface.h" +#include "mutex.h" +#include "types.h" namespace mooncake { -struct StorageObjectMetadata { - int64_t bucket_id; - int64_t offset; - int64_t key_size; - int64_t data_size; -}; - struct BucketObjectMetadata { int64_t offset; int64_t key_size; @@ -29,14 +23,16 @@ YLT_REFL(BucketObjectMetadata, offset, key_size, data_size); struct BucketMetadata { int64_t meta_size; int64_t data_size; - std::unordered_map object_metadata; std::vector keys; + std::vector metadatas; }; -YLT_REFL(BucketMetadata, data_size, object_metadata, keys); +YLT_REFL(BucketMetadata, data_size, keys, metadatas); struct OffloadMetadata { int64_t total_keys; int64_t total_size; + OffloadMetadata(std::size_t keys, int64_t size) + : total_keys(keys), total_size(size) {} }; enum class FileMode { Read, Write }; @@ -237,8 +233,9 @@ class BucketStorageBackend { */ tl::expected BatchOffload( const std::unordered_map>& batch_object, - std::function&)> + std::function< + ErrorCode(const std::vector& keys, + const std::vector& metadatas)> complete_handler); /** @@ -261,7 +258,7 @@ class BucketStorageBackend { * @return tl::expected indicating operation status. */ tl::expected BatchLoad( - std::unordered_map& batched_slices); + const std::unordered_map& batched_slices); /** * @brief Retrieves the list of object keys belonging to a specific bucket. @@ -291,18 +288,19 @@ class BucketStorageBackend { * @brief Iterate over the metadata of stored objects starting from a * specified bucket. * @param bucket_id The ID of the bucket to start scanning from. - * @param objects Output parameter: a map from object key to its metadata. - * @param buckets Output parameter: a list of bucket IDs encountered during - * iteration. - * @param limit Maximum number of objects to return in this iteration. + * @param keys Output vector to receive object keys + * @param metadatas Output vector to receive object metadata + * @param buckets Output vector to receive corresponding destination + * bucket IDs + * @param limit Maximum number of entries to return in this call * @return tl::expected * - On success: the bucket ID where the next iteration should start (or 0 * if all data has been scanned). * - On failure: returns an error code (e.g., BUCKET_NOT_FOUND, IO_ERROR). */ tl::expected BucketScan( - int64_t bucket_id, - std::unordered_map& objects, + int64_t bucket_id, std::vector& keys, + std::vector& metadatas, std::vector& buckets, int64_t limit); /** @@ -314,8 +312,10 @@ class BucketStorageBackend { private: tl::expected, ErrorCode> BuildBucket( + int64_t bucket_id, const std::unordered_map>& batch_object, - std::vector& iovs); + std::vector& iovs, + std::vector& metadatas); tl::expected WriteBucket( int64_t bucket_id, std::shared_ptr bucket_metadata, @@ -329,7 +329,8 @@ class BucketStorageBackend { tl::expected BatchLoadBucket( int64_t bucket_id, const std::vector& keys, - std::unordered_map& batched_slices); + const std::vector& metadatas, + const std::unordered_map& batched_slices); tl::expected CreateBucketId(); diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h index 100cc13de..fb2c0dbc2 100644 --- a/mooncake-store/include/types.h +++ b/mooncake-store/include/types.h @@ -157,7 +157,8 @@ enum class ErrorCode : int32_t { BUCKET_NOT_FOUND = -1200, ///< Bucket not found. BUCKET_ALREADY_EXISTS = -1201, ///< Bucket already exists. - KEYS_ULTRA_BUCKET_LIMIT = -1202, ///< Keys ultra bucket limit. + KEYS_EXCEED_BUCKET_LIMIT = -1202, ///< Keys exceed bucket limit. + KEYS_ULTRA_LIMIT = -1203, ///< Keys ultra limit. UNABLE_OFFLOAD = -1300, ///< The offload functionality is not enabled }; @@ -240,4 +241,12 @@ inline std::ostream& operator<<(std::ostream& os, return os; } +struct StorageObjectMetadata { + int64_t bucket_id; + int64_t offset; + int64_t key_size; + int64_t data_size; + YLT_REFL(StorageObjectMetadata, bucket_id, offset, key_size, data_size); +}; + } // namespace mooncake diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index 7d6300909..e8cc7ed41 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -24,6 +24,7 @@ set(MOONCAKE_STORE_SOURCES client_buffer.cpp pybind_client.cpp http_metadata_server.cpp + file_storage.cpp ) set(EXTRA_LIBS "") diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp index 263d8cbf4..607466821 100644 --- a/mooncake-store/src/client.cpp +++ b/mooncake-store/src/client.cpp @@ -1523,6 +1523,36 @@ std::vector> Client::BatchIsExist( return response; } +tl::expected Client::MountFileStorage( + const std::string& segment_name, const std::string& local_rpc_addr, + bool enable_offloading) { + // TODO: Implement this function + return {}; +} + +tl::expected Client::OffloadObjectHeartbeat( + const std::string& segment_name, bool enable_offloading, + std::unordered_map& offloading_objects) { + // TODO: Implement this function + return {}; +} + +tl::expected Client::BatchPutOffloadObject( + const std::string& transfer_engine_addr, + const std::vector& keys, + const std::vector& pointers, + const std::unordered_map& batched_slices) { + // TODO: Implement this function + return {}; +} + +tl::expected Client::NotifyOffloadSuccess( + const std::string& segment_name, const std::vector& keys, + const std::vector& metadatas) { + // TODO: Implement this function + return {}; +} + void Client::PrepareStorageBackend(const std::string& storage_root_dir, const std::string& fsdir) { // Initialize storage backend diff --git a/mooncake-store/src/file_storage.cpp b/mooncake-store/src/file_storage.cpp new file mode 100644 index 000000000..612a6a156 --- /dev/null +++ b/mooncake-store/src/file_storage.cpp @@ -0,0 +1,618 @@ +#include "file_storage.h" + +#include +#include +#include + +#include "utils.h" +namespace mooncake { + +// Helper: Get integer from environment variable, fallback to default +template +T FileStorageConfig::GetEnvOr(const char* name, T default_value) { + const char* env_val = std::getenv(name); + if (!env_val || std::string(env_val).empty()) { + return default_value; + } + try { + long long value = std::stoll(env_val); + // Check range for unsigned types + if constexpr (std::is_same_v) { + if (value < 0 || value > UINT32_MAX) throw std::out_of_range(""); + } + return static_cast(value); + } catch (...) { + return default_value; + } +} + +// Helper: Get string from environment variable, fallback to default +std::string FileStorageConfig::GetEnvStringOr( + const char* name, const std::string& default_value) { + const char* env_val = std::getenv(name); + return env_val ? std::string(env_val) : default_value; +} + +FileStorageConfig FileStorageConfig::FromEnvironment() { + FileStorageConfig config; + + config.storage_filepath = + GetEnvStringOr("FILE_STORAGE_PATH", config.storage_filepath); + + config.local_buffer_size = + GetEnvOr("LOCAL_BUFFER_SIZE_BYTES", config.local_buffer_size); + + config.bucket_iterator_keys_limit = GetEnvOr( + "BUCKET_ITERATOR_KEYS_LIMIT", config.bucket_iterator_keys_limit); + + config.bucket_keys_limit = + GetEnvOr("BUCKET_KEYS_LIMIT", config.bucket_keys_limit); + + config.bucket_size_limit = + GetEnvOr("BUCKET_SIZE_LIMIT_BYTES", config.bucket_size_limit); + + config.total_keys_limit = + GetEnvOr("TOTAL_KEYS_LIMIT", config.total_keys_limit); + + config.total_size_limit = + GetEnvOr("TOTAL_SIZE_LIMIT_BYTES", config.total_size_limit); + + config.heartbeat_interval_seconds = GetEnvOr( + "HEARTBEAT_INTERVAL_SECONDS", config.heartbeat_interval_seconds); + + return config; +} + +bool FileStorageConfig::Validate() const { + if (storage_filepath.empty()) { + LOG(ERROR) << "FileStorageConfig: storage_filepath is invalid"; + return false; + } + const std::string& path = storage_filepath; + namespace fs = std::filesystem; + // 1. Must be an absolute path + if (!fs::path(path).is_absolute()) { + LOG(ERROR) + << "FileStorageConfig: storage_filepath must be an absolute path: " + << path; + return false; + } + + // 2. Check if the path contains ".." components that could lead to path + // traversal (static check) + fs::path p(path); + for (const auto& component : p) { + if (component == "..") { + LOG(ERROR) << "FileStorageConfig: path traversal is not allowed: " + << path; + return false; + } + } + + struct stat stat_buf; + + // 3. Use stat() to check if the path exists + if (::stat(path.c_str(), &stat_buf) != 0) { + LOG(ERROR) << "FileStorageConfig: storage_filepath does not exist: " + << path; + return false; + } + // Path exists — check if it is a directory + if (!S_ISDIR(stat_buf.st_mode)) { + LOG(ERROR) << "FileStorageConfig: storage_filepath is not a directory: " + << path; + return false; + } + + // (Optional) Check write permission + if (::access(path.c_str(), W_OK) != 0) { + LOG(ERROR) << "FileStorageConfig: no write permission on directory: " + << path; + return false; + } + + // 4. Additional security: prevent symlink bypass (optional) + // Use lstat to avoid automatic dereferencing of symbolic links + struct stat lstat_buf; + if (::lstat(path.c_str(), &lstat_buf) == 0) { + if (S_ISLNK(lstat_buf.st_mode)) { + LOG(ERROR) << "FileStorageConfig: symbolic link is not allowed: " + << path; + return false; + } + } + if (bucket_keys_limit <= 0) { + LOG(ERROR) << "FileStorageConfig: bucket_keys_limit must > 0"; + return false; + } + if (bucket_size_limit <= 0) { + LOG(ERROR) << "FileStorageConfig: bucket_size_limit must > 0"; + return false; + } + if (total_keys_limit <= 0) { + LOG(ERROR) << "FileStorageConfig: total_keys_limit must > 0"; + return false; + } + if (total_size_limit == 0) { + LOG(ERROR) << "FileStorageConfig: total_size_limit should not be zero"; + return false; + } + if (heartbeat_interval_seconds <= 0) { + LOG(ERROR) << "FileStorageConfig: heartbeat_interval_seconds must > 0"; + return false; + } + return true; +} + +FileStorage::FileStorage(std::shared_ptr client, + const std::string& segment_name, + const std::string& local_rpc_addr, + const FileStorageConfig& config) + : client_(client), + segment_name_(segment_name), + local_rpc_addr_(local_rpc_addr), + config_(config), + storage_backend_( + std::make_shared(config.storage_filepath)), + client_buffer_allocator_( + ClientBufferAllocator::create(config.local_buffer_size, "")), + sync_stat_bucket_iterator_(storage_backend_, + config.bucket_iterator_keys_limit) { + if (!config.Validate()) { + throw std::invalid_argument("Invalid FileStorage configuration"); + } +} + +FileStorage::~FileStorage() { + LOG(INFO) << "Shutdown FileStorage..."; + heartbeat_running_ = false; + if (heartbeat_thread_.joinable()) { + heartbeat_thread_.join(); + } +} + +tl::expected FileStorage::Init() { + auto register_memory_result = RegisterLocalMemory(); + if (!register_memory_result) { + LOG(ERROR) << "Failed to register local memory: " + << register_memory_result.error(); + return register_memory_result; + } + auto init_storage_backend_result = storage_backend_->Init(); + if (!init_storage_backend_result) { + LOG(ERROR) << "Failed to init storage backend: " + << init_storage_backend_result.error(); + return init_storage_backend_result; + } + auto enable_offloading_result = IsEnableOffloading(); + if (!enable_offloading_result) { + LOG(ERROR) << "Failed to get enable persist result, error : " + << enable_offloading_result.error(); + return tl::make_unexpected(enable_offloading_result.error()); + } + { + MutexLocker locker(&offloading_mutex_); + enable_offloading_ = enable_offloading_result.value(); + auto mount_file_storage_result = client_->MountFileStorage( + segment_name_, local_rpc_addr_, enable_offloading_); + if (!mount_file_storage_result) { + LOG(ERROR) << "Failed to mount file storage: " + << mount_file_storage_result.error(); + return mount_file_storage_result; + } + } + + BucketIterator bucket_iterator(storage_backend_, + config_.bucket_iterator_keys_limit); + while (true) { + auto has_next_res = bucket_iterator.HasNext(); + if (!has_next_res) { + LOG(ERROR) << "Failed to check for next bucket: " + << has_next_res.error(); + return tl::make_unexpected(has_next_res.error()); + } + if (!has_next_res.value()) { + break; + } + auto add_all_object_res = bucket_iterator.HandleNext( + [this](const std::vector& keys, + const std::vector& metadatas, + const std::vector&) { + auto add_object_result = client_->NotifyOffloadSuccess( + segment_name_, keys, metadatas); + if (!add_object_result) { + LOG(ERROR) << "Failed to add object to master: " + << add_object_result.error(); + return add_object_result.error(); + } + return ErrorCode::OK; + }); + if (!add_all_object_res) { + LOG(ERROR) << "Failed to add all object to master: " + << add_all_object_res.error(); + return add_all_object_res; + } + } + + heartbeat_running_.store(true); + heartbeat_thread_ = std::thread([this]() { + LOG(INFO) << "Starting periodic task with interval: " + << config_.heartbeat_interval_seconds + << "s, running is: " << heartbeat_running_.load(); + while (heartbeat_running_.load()) { + Heartbeat(); + std::this_thread::sleep_for( + std::chrono::seconds(config_.heartbeat_interval_seconds)); + } + }); + return {}; +} + +tl::expected FileStorage::BatchGet( + const std::string& transfer_engine_addr, + const std::vector& keys, + const std::vector& pointers, const std::vector& sizes) { + auto start_time = std::chrono::steady_clock::now(); + auto allocate_res = AllocateBatch(keys, sizes); + if (!allocate_res) { + LOG(ERROR) << "Failed to allocate batch objects, target = " + << transfer_engine_addr; + return tl::make_unexpected(allocate_res.error()); + } + auto result = BatchLoad(allocate_res.value().slices); + if (!result) { + LOG(ERROR) << "Batch load object failed,err_code = " << result.error(); + return result; + } + auto batch_put_result = client_->BatchPutOffloadObject( + transfer_engine_addr, keys, pointers, allocate_res.value().slices); + + auto end_time = std::chrono::steady_clock::now(); + auto elapsed_time = std::chrono::duration_cast( + end_time - start_time) + .count(); + VLOG(1) << "Time taken for FileStorage::BatchGet: " << elapsed_time + << "us,with transfer_engine_addr: " << transfer_engine_addr + << ", key size: " << keys.size(); + if (!batch_put_result) { + LOG(ERROR) << "Batch write offload object failed,err_code = " + << batch_put_result.error(); + return batch_put_result; + } + return {}; +} + +tl::expected FileStorage::OffloadObjects( + const std::unordered_map& offloading_objects) { + std::vector> buckets_keys; + auto allocate_objects_result = + GroupOffloadingKeysByBucket(offloading_objects, buckets_keys); + if (!allocate_objects_result) { + LOG(ERROR) << "GroupKeysByBucket failed with error: " + << allocate_objects_result.error(); + return allocate_objects_result; + } + for (const auto& keys : buckets_keys) { + auto enable_offloading_result = IsEnableOffloading(); + if (!enable_offloading_result) { + LOG(ERROR) << "Get is enable offloading failed with error: " + << enable_offloading_result.error(); + return tl::make_unexpected(enable_offloading_result.error()); + } + if (!enable_offloading_result.value()) { + LOG(WARNING) << "Unable to be persisted"; + MutexLocker locker(&offloading_mutex_); + ungrouped_offloading_objects_.clear(); + enable_offloading_ = false; + return tl::make_unexpected(ErrorCode::KEYS_ULTRA_LIMIT); + } + auto result = BatchOffload(keys); + if (!result) { + LOG(ERROR) << "Failed to store objects with error: " + << result.error(); + if (result.error() != ErrorCode::INVALID_READ) { + return result; + } + } + } + return {}; +} + +tl::expected FileStorage::IsEnableOffloading() { + auto store_metadata_result = storage_backend_->GetStoreMetadata(); + if (!store_metadata_result) { + LOG(ERROR) << "Failed to get store metadata: " + << store_metadata_result.error(); + return tl::make_unexpected(store_metadata_result.error()); + } + const auto& store_metadata = store_metadata_result.value(); + auto enable_offloading = + store_metadata.total_keys + config_.bucket_keys_limit <= + config_.total_keys_limit && + store_metadata.total_size + config_.bucket_size_limit <= + config_.total_size_limit; + + VLOG(1) << (enable_offloading ? "Enable" : "Unable") + << " offloading,total keys: " << store_metadata.total_keys + << ", bucket keys limit: " << config_.bucket_keys_limit + << ", total keys limit: " << config_.total_keys_limit + << ", total size: " << store_metadata.total_size + << ", bucket size limit: " << config_.bucket_size_limit + << ", total size limit: " << config_.total_size_limit; + + return enable_offloading; +} + +tl::expected FileStorage::Heartbeat() { + if (client_ == nullptr) { + LOG(ERROR) << "client is nullptr"; + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + std::unordered_map + offloading_objects; // Objects selected for offloading + + // === STEP 1: Send heartbeat and get offloading decisions === + { + MutexLocker locker(&offloading_mutex_); + auto heartbeat_result = client_->OffloadObjectHeartbeat( + segment_name_, enable_offloading_, offloading_objects); + if (!heartbeat_result) { + LOG(ERROR) << "Failed to send heartbeat with error: " + << heartbeat_result.error(); + return heartbeat_result; + } + } + + // === STEP 2: Persist offloaded objects (trigger actual data migration) === + auto offload_result = OffloadObjects(offloading_objects); + if (!offload_result) { + LOG(ERROR) << "Failed to persist objects with error: " + << offload_result.error(); + return offload_result; + } + + // TODO(eviction): Implement an LRU eviction mechanism to manage local + // storage capacity. + return {}; +} + +tl::expected FileStorage::BatchOffload( + const std::vector& keys) { + auto start_time = std::chrono::steady_clock::now(); + std::unordered_map> batch_object; + auto query_result = BatchQuerySegmentSlices(keys, batch_object); + if (!query_result) { + LOG(ERROR) << "BatchQuerySlices failed with error: " + << query_result.error(); + return tl::make_unexpected(ErrorCode::INVALID_READ); + } + auto result = storage_backend_->BatchOffload( + batch_object, + [this](const std::vector& keys, + const std::vector& metadatas) { + VLOG(1) << "Success to store objects, keys count: " << keys.size(); + auto result = + client_->NotifyOffloadSuccess(segment_name_, keys, metadatas); + if (!result) { + LOG(ERROR) << "NotifyOffloadSuccess failed with error: " + << result.error(); + return result.error(); + } + return ErrorCode::OK; + }); + auto end_time = std::chrono::steady_clock::now(); + auto elapsed_time = std::chrono::duration_cast( + end_time - start_time) + .count(); + VLOG(1) << "Time taken for BatchStore: " << elapsed_time + << "us,with keys count: " << keys.size(); + if (!result) { + LOG(ERROR) << "Batch store object failed, err_code = " + << result.error(); + return tl::make_unexpected(result.error()); + } + return {}; +} + +tl::expected FileStorage::BatchLoad( + const std::unordered_map& batch_object) { + auto start_time = std::chrono::steady_clock::now(); + auto result = storage_backend_->BatchLoad(batch_object); + auto end_time = std::chrono::steady_clock::now(); + auto elapsed_time = std::chrono::duration_cast( + end_time - start_time) + .count(); + VLOG(1) << "Time taken for BatchStore: " << elapsed_time + << "us,with keys count: " << batch_object.size(); + if (!result) { + LOG(ERROR) << "Batch load object failed,err_code = " << result.error(); + } + return result; +} + +tl::expected FileStorage::BatchQuerySegmentSlices( + const std::vector& keys, + std::unordered_map>& batched_slices) { + auto batched_query_results = client_->BatchQuery(keys); + if (batched_query_results.empty()) + return tl::make_unexpected(ErrorCode::INVALID_REPLICA); + for (size_t i = 0; i < batched_query_results.size(); ++i) { + if (batched_query_results[i]) { + for (const auto& descriptor : + batched_query_results[i].value().replicas) { + if (descriptor.is_memory_replica()) { + std::vector slices; + const auto& memory_descriptor = + descriptor.get_memory_descriptor(); + if (memory_descriptor.buffer_descriptor + .transport_endpoint_ != segment_name_) { + break; + } + void* slice_ptr = reinterpret_cast( + memory_descriptor.buffer_descriptor.buffer_address_); + slices.emplace_back(Slice{ + slice_ptr, memory_descriptor.buffer_descriptor.size_}); + batched_slices.insert({keys[i], std::move(slices)}); + break; + } + } + if (batched_slices.find(keys[i]) == batched_slices.end()) { + LOG(ERROR) << "Key not found: " << keys[i]; + return tl::make_unexpected(ErrorCode::INVALID_KEY); + } + } else { + LOG(ERROR) << "Key not found: " << keys[i]; + return tl::make_unexpected(batched_query_results[i].error()); + } + } + return {}; +} + +tl::expected FileStorage::RegisterLocalMemory() { + auto error_code = client_->RegisterLocalMemory( + client_buffer_allocator_->getBase(), config_.local_buffer_size, + kWildcardLocation, false, false); + if (!error_code) { + LOG(ERROR) << "Failed to register local memory: " << error_code.error(); + return error_code; + } + return {}; +} + +tl::expected FileStorage::AllocateBatch( + const std::vector& keys, const std::vector& sizes) { + AllocatedBatch result; + for (size_t i = 0; i < keys.size(); ++i) { + assert(sizes[i] <= kMaxSliceSize); + auto alloc_result = client_buffer_allocator_->allocate(sizes[i]); + if (!alloc_result) { + LOG(ERROR) << "Failed to allocate slice buffer, size = " << sizes[i] + << ", key = " << keys[i]; + return tl::make_unexpected(ErrorCode::BUFFER_OVERFLOW); + } + result.slices.emplace( + keys[i], Slice{alloc_result->ptr(), static_cast(sizes[i])}); + result.handles.emplace_back(std::move(alloc_result.value())); + } + return result; +} + +tl::expected FileStorage::GroupOffloadingKeysByBucket( + const std::unordered_map& offloading_objects, + std::vector>& buckets_keys) { + MutexLocker locker(&offloading_mutex_); + auto it = offloading_objects.cbegin(); + int64_t residue_count = + offloading_objects.size() + ungrouped_offloading_objects_.size(); + int64_t total_count = + offloading_objects.size() + ungrouped_offloading_objects_.size(); + while (it != offloading_objects.cend()) { + std::vector bucket_keys; + std::unordered_map bucket_objects; + int64_t bucket_data_size = 0; + // Process previously ungrouped objects first + if (!ungrouped_offloading_objects_.empty()) { + for (const auto& ungrouped_objects_it : + ungrouped_offloading_objects_) { + bucket_data_size += ungrouped_objects_it.second; + bucket_keys.push_back(ungrouped_objects_it.first); + bucket_objects.emplace(ungrouped_objects_it.first, + ungrouped_objects_it.second); + } + VLOG(1) << "Ungrouped offloading objects have been processed and " + "cleared; count=" + << ungrouped_offloading_objects_.size(); + ungrouped_offloading_objects_.clear(); + } + + // Fill the rest of the bucket with new offloading objects + for (size_t i = bucket_keys.size(); i < config_.bucket_keys_limit; + ++i) { + if (it == offloading_objects.cend()) { + // No more objects to add — move current batch to ungrouped pool + for (const auto& bucket_object : bucket_objects) { + ungrouped_offloading_objects_.emplace(bucket_object.first, + bucket_object.second); + } + VLOG(1) << "Add offloading objects to ungrouped pool. " + << "Total ungrouped count: " + << ungrouped_offloading_objects_.size(); + return {}; + } + if (it->second > config_.bucket_size_limit) { + LOG(ERROR) << "Object size exceeds bucket size limit: " + << "key=" << it->first + << ", object_size=" << it->second + << ", limit=" << config_.bucket_size_limit; + ++it; + continue; + } + auto is_exist_result = storage_backend_->IsExist(it->first); + if (!is_exist_result) { + LOG(ERROR) << "Failed to check existence in storage backend: " + << "key=" << it->first + << ", error=" << is_exist_result.error(); + } + if (is_exist_result.value()) { + ++it; + continue; + } + if (bucket_data_size + it->second > config_.bucket_size_limit) { + break; + } + bucket_data_size += it->second; + bucket_keys.push_back(it->first); + bucket_objects.emplace(it->first, it->second); + ++it; + if (bucket_data_size == config_.bucket_size_limit) { + break; + } + } + auto bucket_keys_count = bucket_keys.size(); + // Finalize current bucket + residue_count -= bucket_keys_count; + buckets_keys.push_back(std::move(bucket_keys)); + VLOG(1) << "Group objects with total object count: " << total_count + << ", current bucket object count: " << bucket_keys_count + << ", current bucket data size: " << bucket_data_size + << ", grouped bucket count: " << buckets_keys.size() + << ", residue object count: " << residue_count; + } + return {}; +} + +BucketIterator::BucketIterator( + std::shared_ptr storage_backend, int64_t limit) + : storage_backend_(storage_backend), limit_(limit) {}; + +tl::expected BucketIterator::HandleNext( + const std::function< + ErrorCode(const std::vector& keys, + const std::vector& metadatas, + const std::vector& buckets)>& handler) { + MutexLocker locker(&mutex_); + std::vector keys; + std::vector metadatas; + std::vector buckets; + auto key_iterator_result = storage_backend_->BucketScan( + next_bucket_, keys, metadatas, buckets, limit_); + if (!key_iterator_result) { + LOG(ERROR) << "Bucket scan failed, error : " + << key_iterator_result.error(); + return tl::make_unexpected(key_iterator_result.error()); + } + auto handle_result = handler(keys, metadatas, buckets); + if (handle_result != ErrorCode::OK) { + LOG(ERROR) << "Key iterator failed, error : " << handle_result; + return tl::make_unexpected(handle_result); + } + next_bucket_ = key_iterator_result.value(); + return {}; +} + +tl::expected BucketIterator::HasNext() { + MutexLocker locker(&mutex_); + return next_bucket_ != 0; +} + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/storage_backend.cpp b/mooncake-store/src/storage_backend.cpp index acbd1fa4d..f147d363f 100644 --- a/mooncake-store/src/storage_backend.cpp +++ b/mooncake-store/src/storage_backend.cpp @@ -2,12 +2,14 @@ #include #include + +#include #include #include -#include #include -#include "utils.h" + #include "mutex.h" +#include "utils.h" namespace mooncake { @@ -331,12 +333,13 @@ BucketStorageBackend::BucketStorageBackend(const std::string& storage_path) tl::expected BucketStorageBackend::BatchOffload( const std::unordered_map>& batch_object, std::function< - ErrorCode(const std::unordered_map&)> + ErrorCode(const std::vector& keys, + const std::vector& metadatas)> complete_handler) { if (!initialized_.load(std::memory_order_acquire)) { LOG(ERROR) << "Storage backend is not initialized. Call Init() before use."; - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); } if (batch_object.empty()) { LOG(ERROR) << "batch object is empty"; @@ -344,7 +347,9 @@ tl::expected BucketStorageBackend::BatchOffload( } auto bucket_id = bucket_id_generator_->NextId(); std::vector iovs; - auto build_bucket_result = BuildBucket(batch_object, iovs); + std::vector metadatas; + auto build_bucket_result = + BuildBucket(bucket_id, batch_object, iovs, metadatas); if (!build_bucket_result) { LOG(ERROR) << "Failed to build bucket with id: " << bucket_id; return tl::make_unexpected(build_bucket_result.error()); @@ -356,20 +361,19 @@ tl::expected BucketStorageBackend::BatchOffload( return tl::make_unexpected(write_bucket_result.error()); } if (complete_handler != nullptr) { - auto error_code = complete_handler(bucket->object_metadata); + auto error_code = complete_handler(bucket->keys, metadatas); if (error_code != ErrorCode::OK) { - LOG(ERROR) << "Sync Store object failed,err_code = " << error_code; + LOG(ERROR) << "Complete handler failed: " << error_code + << ", Key count: " << bucket->keys.size() + << ", Bucket id: " << bucket_id; return tl::make_unexpected(error_code); } } SharedMutexLocker lock(&mutex_); total_size_ += bucket->data_size + bucket->meta_size; - for (auto object_metadata_it : bucket->object_metadata) { - object_bucket_map_.emplace( - object_metadata_it.first, - StorageObjectMetadata{bucket_id, object_metadata_it.second.offset, - object_metadata_it.second.key_size, - object_metadata_it.second.data_size}); + object_bucket_map_.reserve(object_bucket_map_.size() + bucket->keys.size()); + for (size_t i = 0; i < bucket->keys.size(); ++i) { + object_bucket_map_.emplace(bucket->keys[i], std::move(metadatas[i])); } buckets_.emplace(bucket_id, std::move(bucket)); return bucket_id; @@ -393,8 +397,10 @@ tl::expected BucketStorageBackend::BatchQuery( } tl::expected BucketStorageBackend::BatchLoad( - std::unordered_map& batch_object) { - std::unordered_map> bucket_key_map; + const std::unordered_map& batch_object) { + std::unordered_map> bucket_keys_map; + std::unordered_map> + bucket_key_metas_map; { SharedMutexLocker lock(&mutex_, shared_lock); for (const auto& key_it : batch_object) { @@ -403,16 +409,21 @@ tl::expected BucketStorageBackend::BatchLoad( LOG(ERROR) << "key " << key_it.first << " does not exist"; return tl::make_unexpected(ErrorCode::INVALID_KEY); } - auto [bucket_keys_it, _] = - bucket_key_map.try_emplace(object_bucket_it->second.bucket_id); + auto [bucket_keys_it, create_keys_it] = + bucket_keys_map.try_emplace(object_bucket_it->second.bucket_id); bucket_keys_it->second.emplace_back(key_it.first); + auto [bucket_key_metas_it, create_metas_it] = + bucket_key_metas_map.try_emplace( + object_bucket_it->second.bucket_id); + bucket_key_metas_it->second.emplace_back(object_bucket_it->second); } } - for (const auto& bucket_key_it : bucket_key_map) { - auto result = BatchLoadBucket(bucket_key_it.first, bucket_key_it.second, - batch_object); + for (const auto& bucket_keys_it : bucket_keys_map) { + auto result = BatchLoadBucket( + bucket_keys_it.first, bucket_keys_it.second, + bucket_key_metas_map.at(bucket_keys_it.first), batch_object); if (!result) { - LOG(ERROR) << "Failed to load bucket " << bucket_key_it.first; + LOG(ERROR) << "Failed to load bucket " << bucket_keys_it.first; return result; } } @@ -437,7 +448,7 @@ tl::expected BucketStorageBackend::Init() { try { if (initialized_.load(std::memory_order_acquire)) { LOG(ERROR) << "Storage backend already initialized"; - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); } SharedMutexLocker lock(&mutex_); object_bucket_map_.clear(); @@ -448,13 +459,14 @@ tl::expected BucketStorageBackend::Init() { fs::recursive_directory_iterator(storage_path_)) { if (entry.is_regular_file() && entry.path().extension() == BUCKET_METADATA_FILE_SUFFIX) { - auto bucket_id_str = entry.path().stem(); + const auto& bucket_id_str = entry.path().stem(); int64_t bucket_id = std::stoll(bucket_id_str); auto [metadata_it, success] = buckets_.try_emplace( bucket_id, std::make_shared()); if (!success) { LOG(ERROR) << "Failed to load bucket " << bucket_id_str; - return tl::unexpected(ErrorCode::BUCKET_ALREADY_EXISTS); + return tl::make_unexpected( + ErrorCode::BUCKET_ALREADY_EXISTS); } auto load_bucket_metadata_result = LoadBucketMetadata(bucket_id, metadata_it->second); @@ -480,7 +492,7 @@ tl::expected BucketStorageBackend::Init() { } auto& meta = *(metadata_it->second); if (meta.data_size == 0 || meta.meta_size == 0 || - meta.object_metadata.empty() || meta.keys.empty()) { + meta.metadatas.empty() || meta.keys.empty()) { LOG(ERROR) << "Metadata validation failed for bucket: " << bucket_id_str << ", will delete the bucket's data and " @@ -490,10 +502,9 @@ tl::expected BucketStorageBackend::Init() { LOG(ERROR) << " meta_size: " << meta.meta_size << " (should not be 0)"; LOG(ERROR) - << " object_metadata.size(): " - << meta.object_metadata.size() << " (empty: " - << (meta.object_metadata.empty() ? "true" : "false") - << ")"; + << " object_metadata.size(): " << meta.metadatas.size() + << " (empty: " + << (meta.metadatas.empty() ? "true" : "false") << ")"; LOG(ERROR) << " keys.size(): " << meta.keys.size() @@ -518,15 +529,14 @@ tl::expected BucketStorageBackend::Init() { } total_size_ += metadata_it->second->data_size + metadata_it->second->meta_size; - for (const auto& object_metadata_it : - metadata_it->second->object_metadata) { + for (size_t i = 0; i < metadata_it->second->keys.size(); i++) { object_bucket_map_.emplace( - object_metadata_it.first, + metadata_it->second->keys[i], StorageObjectMetadata{ metadata_it->first, - object_metadata_it.second.offset, - object_metadata_it.second.key_size, - object_metadata_it.second.data_size}); + metadata_it->second->metadatas[i].offset, + metadata_it->second->metadatas[i].key_size, + metadata_it->second->metadatas[i].data_size}); } } } @@ -543,7 +553,7 @@ tl::expected BucketStorageBackend::Init() { } catch (const std::exception& e) { LOG(ERROR) << "Bucket storage backend initialize error: " << e.what() << std::endl; - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); } return {}; @@ -560,8 +570,8 @@ tl::expected BucketStorageBackend::IsExist( } tl::expected BucketStorageBackend::BucketScan( - int64_t bucket_id, - std::unordered_map& objects, + int64_t bucket_id, std::vector& keys, + std::vector& metadatas, std::vector& buckets, int64_t limit) { SharedMutexLocker lock(&mutex_, shared_lock); auto bucket_it = buckets_.lower_bound(bucket_id); @@ -571,14 +581,18 @@ tl::expected BucketStorageBackend::BucketScan( << "bucket_id=" << bucket_it->first << ", current_size=" << bucket_it->second->keys.size() << ", limit=" << limit; - return tl::make_unexpected(ErrorCode::KEYS_ULTRA_BUCKET_LIMIT); + return tl::make_unexpected(ErrorCode::KEYS_EXCEED_BUCKET_LIMIT); } - if (bucket_it->second->keys.size() + objects.size() > limit) { + if (bucket_it->second->keys.size() + keys.size() > limit) { return bucket_it->first; } buckets.emplace_back(bucket_it->first); - for (const auto& object_it : bucket_it->second->object_metadata) { - objects.emplace(object_it.first, object_it.second); + for (size_t i = 0; i < bucket_it->second->keys.size(); i++) { + keys.emplace_back(bucket_it->second->keys[i]); + metadatas.emplace_back(StorageObjectMetadata{ + bucket_it->first, bucket_it->second->metadatas[i].offset, + bucket_it->second->metadatas[i].key_size, + bucket_it->second->metadatas[i].data_size}); } } return 0; @@ -587,14 +601,15 @@ tl::expected BucketStorageBackend::BucketScan( tl::expected BucketStorageBackend::GetStoreMetadata() { SharedMutexLocker lock(&mutex_, shared_lock); - OffloadMetadata metadata{object_bucket_map_.size(), total_size_}; + OffloadMetadata metadata(object_bucket_map_.size(), total_size_); return metadata; } tl::expected, ErrorCode> BucketStorageBackend::BuildBucket( + int64_t bucket_id, const std::unordered_map>& batch_object, - std::vector& iovs) { + std::vector& iovs, std::vector& metadatas) { auto bucket = std::make_shared(); int64_t storage_offset = 0; for (const auto& object : batch_object) { @@ -610,10 +625,12 @@ BucketStorageBackend::BuildBucket( iovs.emplace_back(iovec{slice.ptr, slice.size}); } bucket->data_size += object_total_size + object.first.size(); - bucket->object_metadata.emplace( - object.first, - BucketObjectMetadata{storage_offset, object.first.size(), - object_total_size}); + bucket->metadatas.emplace_back(BucketObjectMetadata{ + storage_offset, static_cast(object.first.size()), + object_total_size}); + metadatas.emplace_back(StorageObjectMetadata{ + bucket_id, storage_offset, + static_cast(object.first.size()), object_total_size}); bucket->keys.push_back(object.first); storage_offset += object_total_size + object.first.size(); } @@ -643,7 +660,12 @@ tl::expected BucketStorageBackend::WriteBucket( << ", error: " << write_result.error(); return tl::make_unexpected(write_result.error()); } - + if (write_result.value() != bucket_metadata->data_size) { + LOG(ERROR) << "Write size mismatch for: " << bucket_data_path + << ", expected: " << bucket_metadata->data_size + << ", got: " << write_result.value(); + return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); + } auto store_bucket_metadata_result = StoreBucketMetadata(bucket_id, bucket_metadata); if (!store_bucket_metadata_result) { @@ -676,6 +698,12 @@ tl::expected BucketStorageBackend::StoreBucketMetadata( << ", error: " << write_result.error(); return tl::make_unexpected(write_result.error()); } + if (write_result.value() != str.size()) { + LOG(ERROR) << "Write size mismatch for: " << meta_path + << ", expected: " << str.size() + << ", got: " << write_result.value(); + return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); + } metadata->meta_size = str.size(); return {}; } @@ -702,9 +730,10 @@ tl::expected BucketStorageBackend::LoadBucketMetadata( << ", error: " << read_result.error(); return tl::make_unexpected(read_result.error()); } - if (*read_result != size) { + if (read_result.value() != size) { LOG(ERROR) << "Read size mismatch for: " << meta_path - << ", expected: " << size << ", got: " << *read_result; + << ", expected: " << size + << ", got: " << read_result.value(); return tl::make_unexpected(ErrorCode::FILE_READ_FAIL); } try { @@ -722,41 +751,33 @@ tl::expected BucketStorageBackend::LoadBucketMetadata( tl::expected BucketStorageBackend::BatchLoadBucket( int64_t bucket_id, const std::vector& keys, - std::unordered_map& batched_slices) { + const std::vector& metadatas, + const std::unordered_map& batched_slices) { SharedMutexLocker locker(&mutex_, shared_lock); auto storage_filepath_res = GetBucketDataPath(bucket_id); if (!storage_filepath_res) { LOG(ERROR) << "Failed to get bucket data path, bucket_id=" << bucket_id; return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); } - auto storage_filepath = storage_filepath_res.value(); + const auto& storage_filepath = storage_filepath_res.value(); auto open_file_result = OpenFile(storage_filepath, FileMode::Read); if (!open_file_result) { LOG(ERROR) << "Failed to open file for reading: " << storage_filepath; return tl::make_unexpected(open_file_result.error()); } auto file = std::move(open_file_result.value()); - for (const auto& key : keys) { + for (int64_t i = 0; i < keys.size(); i++) { + const auto& key = keys[i]; int64_t offset; - auto slice = batched_slices[key]; - auto bucket = buckets_.find(bucket_id); - if (bucket == buckets_.end()) { - LOG(ERROR) << "Bucket not found with id: " << bucket_id; - return tl::make_unexpected(ErrorCode::BUCKET_NOT_FOUND); - } - auto object_metadata = buckets_[bucket_id]->object_metadata.find(key); - if (object_metadata == buckets_[bucket_id]->object_metadata.end()) { - LOG(ERROR) << "Object metadata not found for key '" << key - << "' in bucket " << bucket_id; - return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND); - } - if (object_metadata->second.data_size != slice.size) { + const auto& slice = batched_slices.at(key); + const auto& object_metadata = metadatas[i]; + if (object_metadata.data_size != slice.size) { LOG(ERROR) << "Read size mismatch for: " << storage_filepath - << ", expected: " << object_metadata->second.data_size + << ", expected: " << object_metadata.data_size << ", got: " << slice.size; return tl::make_unexpected(ErrorCode::FILE_READ_FAIL); } - offset = object_metadata->second.offset; + offset = object_metadata.offset; std::vector iovs; iovs.emplace_back(iovec{slice.ptr, slice.size}); auto read_result = file->vector_read( @@ -766,10 +787,10 @@ tl::expected BucketStorageBackend::BatchLoadBucket( << ", error: " << read_result.error(); return tl::make_unexpected(read_result.error()); } - if (*read_result != slice.size) { + if (read_result.value() != slice.size) { LOG(ERROR) << "Read size mismatch for: " << storage_filepath << ", expected: " << slice.size - << ", got: " << *read_result; + << ", got: " << read_result.value(); return tl::make_unexpected(ErrorCode::FILE_READ_FAIL); } } diff --git a/mooncake-store/tests/CMakeLists.txt b/mooncake-store/tests/CMakeLists.txt index 3cf5fea82..6c8d806c2 100644 --- a/mooncake-store/tests/CMakeLists.txt +++ b/mooncake-store/tests/CMakeLists.txt @@ -33,6 +33,7 @@ add_store_test(serializer_test serializer_test.cpp) add_store_test(non_ha_reconnect_test non_ha_reconnect_test.cpp) add_store_test(storage_backend_test storage_backend_test.cpp) add_store_test(mutex_test mutex_test.cpp) +add_store_test(file_storage_test file_storage_test.cpp) add_subdirectory(e2e) add_executable(high_availability_test high_availability_test.cpp) diff --git a/mooncake-store/tests/file_storage_test.cpp b/mooncake-store/tests/file_storage_test.cpp new file mode 100644 index 000000000..b4306e7ba --- /dev/null +++ b/mooncake-store/tests/file_storage_test.cpp @@ -0,0 +1,386 @@ +#include +#include + +#include + +#include "allocator.h" +#include "storage_backend.h" +#include "file_storage.h" + +namespace mooncake { + +void SetEnv(const std::string& key, const std::string& value) { + setenv(key.c_str(), value.c_str(), 1); +} + +void UnsetEnv(const std::string& key) { unsetenv(key.c_str()); } + +class FileStorageTest : public ::testing::Test { + protected: + std::string data_path; + void SetUp() override { + google::InitGoogleLogging("FileStorageTest"); + FLAGS_logtostderr = true; + UnsetEnv("FILE_STORAGE_PATH"); + UnsetEnv("LOCAL_BUFFER_SIZE_BYTES"); + UnsetEnv("BUCKET_ITERATOR_KEYS_LIMIT"); + UnsetEnv("BUCKET_KEYS_LIMIT"); + UnsetEnv("BUCKET_SIZE_LIMIT_BYTES"); + UnsetEnv("TOTAL_KEYS_LIMIT"); + UnsetEnv("TOTAL_SIZE_LIMIT_BYTES"); + UnsetEnv("HEARTBEAT_INTERVAL_SECONDS"); + data_path = std::filesystem::current_path().string() + "/data"; + fs::create_directories(data_path); + for (const auto& entry : fs::directory_iterator(data_path)) { + if (entry.is_regular_file()) { + fs::remove(entry.path()); + } + } + } + + tl::expected FileStorageBatchOffload( + FileStorage& fileStorage, std::vector& keys, + std::vector& sizes, + std::unordered_map& batch_data) { + std::vector buckets; + return BatchOffloadUtil(*fileStorage.storage_backend_, keys, sizes, + batch_data, buckets); + } + + tl::expected + FileStorageAllocateBatch(FileStorage& fileStorage, + const std::vector& keys, + const std::vector& sizes) { + return fileStorage.AllocateBatch(keys, sizes); + } + + tl::expected FileStorageBatchLoad( + FileStorage& fileStorage, + const std::unordered_map& batch_object) { + return fileStorage.BatchLoad(batch_object); + } + + tl::expected FileStorageIsEnableOffloading( + FileStorage& fileStorage) { + return fileStorage.IsEnableOffloading(); + } + + tl::expected FileStorageGroupOffloadingKeysByBucket( + FileStorage& fileStorage, + const std::unordered_map& offloading_objects, + std::vector>& buckets_keys) { + return fileStorage.GroupOffloadingKeysByBucket(offloading_objects, + buckets_keys); + } + + std::unordered_map GetUngroupedOffloadingObjects( + FileStorage& fileStorage) { + return fileStorage.ungrouped_offloading_objects_; + } + + void TearDown() override { + google::ShutdownGoogleLogging(); + LOG(INFO) << "Clear test data..."; + for (const auto& entry : fs::directory_iterator(data_path)) { + if (entry.is_regular_file()) { + fs::remove(entry.path()); + } + } + } +}; + +TEST_F(FileStorageTest, IsEnableOffloading) { + std::unordered_map all_object; + std::vector keys; + std::vector sizes; + std::unordered_map batch_data; + auto file_storage_config = FileStorageConfig::FromEnvironment(); + file_storage_config.storage_filepath = data_path; + file_storage_config.local_buffer_size = 128 * 1024 * 1024; + FileStorage fileStorage1(nullptr, "test", "localhost:9003", + file_storage_config); + ASSERT_TRUE(FileStorageBatchOffload(fileStorage1, keys, sizes, batch_data)); + auto enable_offloading_result1 = + FileStorageIsEnableOffloading(fileStorage1); + ASSERT_TRUE(enable_offloading_result1 && enable_offloading_result1.value()); + file_storage_config.bucket_keys_limit = 10; + file_storage_config.total_keys_limit = 91; + + FileStorage fileStorage2(nullptr, "test", "localhost:9003", + file_storage_config); + keys.clear(); + sizes.clear(); + ASSERT_TRUE(FileStorageBatchOffload(fileStorage2, keys, sizes, batch_data)); + auto enable_offloading_result2 = + FileStorageIsEnableOffloading(fileStorage2); + ASSERT_TRUE(enable_offloading_result2 && + !enable_offloading_result2.value()); + file_storage_config.bucket_size_limit = 969; + FileStorage fileStorage3(nullptr, "test", "localhost:9003", + file_storage_config); + keys.clear(); + sizes.clear(); + ASSERT_TRUE(FileStorageBatchOffload(fileStorage3, keys, sizes, batch_data)); + auto enable_offloading_result3 = + FileStorageIsEnableOffloading(fileStorage3); + ASSERT_TRUE(enable_offloading_result3 && + !enable_offloading_result3.value()); +} + +TEST_F(FileStorageTest, BatchLoad) { + std::vector keys; + std::vector sizes; + std::unordered_map batch_data; + auto file_storage_config = FileStorageConfig::FromEnvironment(); + file_storage_config.storage_filepath = data_path; + FileStorage fileStorage(nullptr, "test", "localhost:9003", + file_storage_config); + ASSERT_TRUE(FileStorageBatchOffload(fileStorage, keys, sizes, batch_data)); + std::unordered_map batch_slice; + std::vector buff; + + auto allocate_res = FileStorageAllocateBatch(fileStorage, keys, sizes); + ASSERT_TRUE(allocate_res); + + ASSERT_TRUE(FileStorageBatchLoad(fileStorage, allocate_res.value().slices)); + for (auto& slice_it : batch_slice) { + std::string data(static_cast(slice_it.second.ptr), + slice_it.second.size); + LOG(INFO) << "key: " << slice_it.first; + ASSERT_EQ(data, batch_data.at(slice_it.first)); + } +} + +TEST_F(FileStorageTest, GroupOffloadingKeysByBucket_bucket_keys_limit) { + std::unordered_map offloading_objects; + for (size_t i = 0; i < 35; i++) { + offloading_objects.emplace("test" + std::to_string(i), 1); + } + std::vector> buckets_keys; + auto file_storage_config = FileStorageConfig::FromEnvironment(); + file_storage_config.storage_filepath = data_path; + file_storage_config.bucket_keys_limit = 10; + file_storage_config.bucket_iterator_keys_limit = 969; + FileStorage fileStorage(nullptr, "test", "localhost:9003", + file_storage_config); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + ASSERT_EQ(buckets_keys.size(), 3); + for (const auto& bucket_keys : buckets_keys) { + ASSERT_EQ(bucket_keys.size(), 10); + } + ASSERT_EQ(GetUngroupedOffloadingObjects(fileStorage).size(), 5); + buckets_keys.clear(); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + ASSERT_EQ(buckets_keys.size(), 4); + for (const auto& bucket_keys : buckets_keys) { + ASSERT_EQ(bucket_keys.size(), 10); + } + ASSERT_EQ(GetUngroupedOffloadingObjects(fileStorage).size(), 0); +} + +TEST_F(FileStorageTest, GroupOffloadingKeysByBucket_bucket_size_limit) { + std::unordered_map offloading_objects; + for (size_t i = 0; i < 35; i++) { + offloading_objects.emplace("test" + std::to_string(i), 1); + } + std::vector> buckets_keys; + auto file_storage_config = FileStorageConfig::FromEnvironment(); + file_storage_config.storage_filepath = data_path; + file_storage_config.bucket_size_limit = 10; + FileStorage fileStorage(nullptr, "test", "localhost:9003", + file_storage_config); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + ASSERT_EQ(buckets_keys.size(), 3); + for (const auto& bucket_keys : buckets_keys) { + ASSERT_EQ(bucket_keys.size(), 10); + } + ASSERT_EQ(GetUngroupedOffloadingObjects(fileStorage).size(), 5); + buckets_keys.clear(); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + ASSERT_EQ(buckets_keys.size(), 4); + for (const auto& bucket_keys : buckets_keys) { + ASSERT_EQ(bucket_keys.size(), 10); + } + ASSERT_EQ(GetUngroupedOffloadingObjects(fileStorage).size(), 0); +} + +TEST_F(FileStorageTest, + GroupOffloadingKeysByBucket_bucket_size_limit_and_bucket_keys_limit) { + std::unordered_map offloading_objects; + for (size_t i = 0; i < 500; i++) { + offloading_objects.emplace("test" + std::to_string(i), i); + } + std::vector> buckets_keys; + auto file_storage_config = FileStorageConfig::FromEnvironment(); + file_storage_config.storage_filepath = data_path; + file_storage_config.bucket_keys_limit = 9; + file_storage_config.bucket_size_limit = 496; + FileStorage fileStorage(nullptr, "test", "localhost:9003", + file_storage_config); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + for (size_t i = 0; i < buckets_keys.size(); i++) { + auto bucket_keys = buckets_keys.at(i); + ASSERT_TRUE(bucket_keys.size() <= 9); + size_t total_size = 0; + std::string keys; + for (const auto& bucket_key : bucket_keys) { + total_size += offloading_objects.at(bucket_key); + keys += bucket_key + ","; + } + ASSERT_TRUE(total_size <= 496); + } +} + +TEST_F(FileStorageTest, + GroupOffloadingKeysByBucket_ungrouped_offloading_objects) { + std::unordered_map offloading_objects; + for (size_t i = 0; i < 1; i++) { + offloading_objects.emplace("test" + std::to_string(i), 1); + } + std::vector> buckets_keys; + auto file_storage_config = FileStorageConfig::FromEnvironment(); + file_storage_config.storage_filepath = data_path; + FileStorage fileStorage(nullptr, "test", "localhost:9003", + file_storage_config); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + offloading_objects.clear(); + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); + for (size_t i = 0; i < 7; i++) { + offloading_objects.emplace("test" + std::to_string(i), 1); + } + ASSERT_TRUE(FileStorageGroupOffloadingKeysByBucket( + fileStorage, offloading_objects, buckets_keys)); +} + +TEST_F(FileStorageTest, DefaultValuesWhenNoEnvSet) { + auto config = FileStorageConfig::FromEnvironment(); + + EXPECT_EQ(config.storage_filepath, "/data/file_storage"); + EXPECT_EQ(config.local_buffer_size, 1280 * 1024 * 1024); + EXPECT_EQ(config.bucket_iterator_keys_limit, 20000); + EXPECT_EQ(config.bucket_keys_limit, 500); + EXPECT_EQ(config.bucket_size_limit, 256 * 1024 * 1024); + EXPECT_EQ(config.total_keys_limit, 10'000'000); + EXPECT_EQ(config.total_size_limit, 2ULL * 1024 * 1024 * 1024 * 1024); + EXPECT_EQ(config.heartbeat_interval_seconds, 10u); +} + +TEST_F(FileStorageTest, ReadStringFromEnv) { + SetEnv("FILE_STORAGE_PATH", "/tmp/storage"); + + auto config = FileStorageConfig::FromEnvironment(); + EXPECT_EQ(config.storage_filepath, "/tmp/storage"); +} + +TEST_F(FileStorageTest, ReadInt64FromEnv) { + SetEnv("LOCAL_BUFFER_SIZE_BYTES", "2147483648"); // 2GB + SetEnv("BUCKET_KEYS_LIMIT", "1000"); + SetEnv("TOTAL_KEYS_LIMIT", "5000000"); + + auto config = FileStorageConfig::FromEnvironment(); + + EXPECT_EQ(config.local_buffer_size, 2147483648); + EXPECT_EQ(config.bucket_keys_limit, 1000); + EXPECT_EQ(config.total_keys_limit, 5000000); +} + +TEST_F(FileStorageTest, ReadUint32FromEnv) { + SetEnv("HEARTBEAT_INTERVAL_SECONDS", "5"); + + auto config = FileStorageConfig::FromEnvironment(); + EXPECT_EQ(config.heartbeat_interval_seconds, 5u); +} + +TEST_F(FileStorageTest, InvalidIntValueUsesDefault) { + SetEnv("BUCKET_KEYS_LIMIT", "abc"); + SetEnv("TOTAL_SIZE_LIMIT_BYTES", "sdfsdf"); + SetEnv("HEARTBEAT_INTERVAL_SECONDS", "-1"); + + auto config = FileStorageConfig::FromEnvironment(); + + EXPECT_EQ(config.bucket_keys_limit, 500); + EXPECT_EQ(config.total_size_limit, 2ULL * 1024 * 1024 * 1024 * 1024); + EXPECT_EQ(config.heartbeat_interval_seconds, 10u); +} + +TEST_F(FileStorageTest, OutOfRangeValueUsesDefault) { + SetEnv("HEARTBEAT_INTERVAL_SECONDS", "4294967296"); // > UINT32_MAX + SetEnv("HEARTBEAT_INTERVAL_SECONDS", "-10"); // negative + + auto config = FileStorageConfig::FromEnvironment(); + EXPECT_EQ(config.heartbeat_interval_seconds, 10u); // fallback to default +} + +TEST_F(FileStorageTest, EmptyEnvValueUsesDefault) { + SetEnv("BUCKET_KEYS_LIMIT", ""); // empty string + + auto config = FileStorageConfig::FromEnvironment(); + EXPECT_EQ(config.bucket_keys_limit, 500); // fallback +} + +TEST_F(FileStorageTest, ValidateSuccessWithValidConfig) { + FileStorageConfig config; + config.storage_filepath = "/valid/path"; + config.bucket_keys_limit = 100; + config.bucket_size_limit = 100000; + config.total_keys_limit = 1000000; + config.total_size_limit = 1073741824; // 1GB + config.heartbeat_interval_seconds = 5; + + EXPECT_TRUE(config.Validate()); +} + +TEST_F(FileStorageTest, ValidateFailsOnEmptyStoragePath) { + FileStorageConfig config; + config.storage_filepath = ""; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = " "; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = "relative/path"; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = "./data"; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = "../data"; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = "/valid/../invalid"; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = "/path/./sub"; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = "/tmp/this_directory_does_not_exist_12345"; + EXPECT_FALSE(config.Validate()); + config.storage_filepath = data_path; + EXPECT_TRUE(config.Validate()); +} + +TEST_F(FileStorageTest, ValidateFailsOnInvalidLimits) { + FileStorageConfig config; + config.storage_filepath = "/tmp"; + + config.bucket_keys_limit = 0; + EXPECT_FALSE(config.Validate()); + + config.bucket_keys_limit = 1; + config.bucket_size_limit = 0; + EXPECT_FALSE(config.Validate()); + + config.bucket_size_limit = 1; + config.total_keys_limit = 0; + EXPECT_FALSE(config.Validate()); + + config.total_keys_limit = 1; + config.total_size_limit = 0; + EXPECT_FALSE(config.Validate()); + + config.total_size_limit = 1; + config.heartbeat_interval_seconds = 0; + EXPECT_FALSE(config.Validate()); +} + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/tests/storage_backend_test.cpp b/mooncake-store/tests/storage_backend_test.cpp index 766477d9e..41dbdd956 100644 --- a/mooncake-store/tests/storage_backend_test.cpp +++ b/mooncake-store/tests/storage_backend_test.cpp @@ -1,26 +1,35 @@ +#include "storage_backend.h" + #include #include -#include "storage_backend.h" -#include "allocator.h" -#include "utils.h" -#include + #include #include #include +#include "allocator.h" +#include "utils.h" +#include "utils/common.h" + namespace fs = std::filesystem; namespace mooncake::test { class StorageBackendTest : public ::testing::Test { protected: + std::string data_path; void SetUp() override { google::InitGoogleLogging("StorageBackendTest"); FLAGS_logtostderr = true; + data_path = std::filesystem::current_path().string() + "/data"; + fs::create_directories(data_path); + for (const auto& entry : fs::directory_iterator(data_path)) { + if (entry.is_regular_file()) { + fs::remove(entry.path()); + } + } } void TearDown() override { google::ShutdownGoogleLogging(); - std::string data_path = - std::filesystem::current_path().string() + "/data"; LOG(INFO) << "Clear test data..."; for (const auto& entry : fs::directory_iterator(data_path)) { if (entry.is_regular_file()) { @@ -30,79 +39,20 @@ class StorageBackendTest : public ::testing::Test { } }; -tl::expected BatchOffload( - std::vector& keys, - std::unordered_map& batch_data, - std::shared_ptr& client_buffer_allocator, - BucketStorageBackend& storage_backend, std::vector& buckets) { - size_t bucket_sz = 10; - size_t batch_sz = 10; - size_t data_sz = 10; - for (size_t i = 0; i < bucket_sz; i++) { - std::unordered_map> batched_slices; - for (size_t j = 0; j < batch_sz; j++) { - std::string key = - "test_key_i_" + std::to_string(i) + "_j_" + std::to_string(j); - size_t data_size = 0; - std::string all_data; - std::vector slices; - for (size_t k = 0; k < data_sz; k++) { - std::string data = "test_data_i_" + std::to_string(i) + "_j_" + - std::to_string(j) + "_k_" + - std::to_string(k); - all_data += data; - data_size += data.size(); - void* buffer = client_buffer_allocator->allocate(data.size()); - memcpy(buffer, data.data(), data.size()); - slices.emplace_back(Slice{buffer, data.size()}); - } - batched_slices.emplace(key, slices); - batch_data.emplace(key, all_data); - keys.emplace_back(key); - } - auto batch_store_object_one_result = storage_backend.BatchOffload( - batched_slices, - [&](const std::unordered_map& - keys) { - if (keys.size() != batched_slices.size()) { - return ErrorCode::INVALID_KEY; - } - for (const auto& key : keys) { - if (batched_slices.find(key.first) == - batched_slices.end()) { - return ErrorCode::INVALID_KEY; - } - } - return ErrorCode::OK; - }); - if (!batch_store_object_one_result) { - return tl::make_unexpected(batch_store_object_one_result.error()); - } - buckets.emplace_back(batch_store_object_one_result.value()); - } - return {}; -} - TEST_F(StorageBackendTest, StorageBackendAll) { - std::string data_path = std::filesystem::current_path().string() + "/data"; - fs::create_directories(data_path); std::shared_ptr client_buffer_allocator = std::make_shared(128 * 1024 * 1024); BucketStorageBackend storage_backend(data_path); - for (const auto& entry : fs::directory_iterator(data_path)) { - if (entry.is_regular_file()) { - fs::remove(entry.path()); - } - } + ASSERT_TRUE(storage_backend.Init()); ASSERT_TRUE(fs::directory_iterator(data_path) == fs::directory_iterator{}); ASSERT_TRUE(!storage_backend.Init()); std::unordered_map test_data; std::vector keys; + std::vector sizes; std::vector buckets; - auto test_batch_store_object_result = BatchOffload( - keys, test_data, client_buffer_allocator, storage_backend, buckets); - ASSERT_TRUE(test_batch_store_object_result); + ASSERT_TRUE( + BatchOffloadUtil(storage_backend, keys, sizes, test_data, buckets)); std::unordered_map batche_object_metadata; @@ -143,38 +93,37 @@ TEST_F(StorageBackendTest, StorageBackendAll) { } } TEST_F(StorageBackendTest, BucketScan) { - std::string data_path = std::filesystem::current_path().string() + "/data"; - fs::create_directories(data_path); std::shared_ptr client_buffer_allocator = std::make_shared(128 * 1024 * 1024); BucketStorageBackend storage_backend(data_path); - for (const auto& entry : fs::directory_iterator(data_path)) { - if (entry.is_regular_file()) { - fs::remove(entry.path()); - } - } ASSERT_TRUE(storage_backend.Init()); ASSERT_TRUE(!storage_backend.Init()); std::unordered_map test_data; std::vector keys; + std::vector sizes; std::vector buckets; - auto test_batch_store_object_result = BatchOffload( - keys, test_data, client_buffer_allocator, storage_backend, buckets); - ASSERT_TRUE(test_batch_store_object_result); - std::unordered_map objects; + std::unordered_map batch_data; + ASSERT_TRUE( + BatchOffloadUtil(storage_backend, keys, sizes, batch_data, buckets)); + std::vector scan_keys; + std::vector scan_metadatas; std::vector scan_buckets; - auto res = storage_backend.BucketScan(0, objects, scan_buckets, 10); + auto res = storage_backend.BucketScan(0, scan_keys, scan_metadatas, + scan_buckets, 10); ASSERT_TRUE(res); ASSERT_EQ(res.value(), buckets.at(1)); - for (const auto& object : objects) { - ASSERT_EQ(object.second.data_size, test_data.at(object.first).size()); - ASSERT_EQ(object.second.key_size, object.first.size()); + for (size_t i = 0; i < scan_keys.size(); i++) { + ASSERT_EQ(scan_metadatas[i].data_size, + test_data.at(scan_keys[i]).size()); + ASSERT_EQ(scan_metadatas[i].key_size, scan_keys[i].size()); } ASSERT_EQ(scan_buckets.size(), 1); ASSERT_EQ(scan_buckets.at(0), buckets.at(0)); - objects.clear(); + scan_keys.clear(); + scan_metadatas.clear(); scan_buckets.clear(); - res = storage_backend.BucketScan(0, objects, scan_buckets, 45); + res = storage_backend.BucketScan(0, scan_keys, scan_metadatas, scan_buckets, + 45); ASSERT_TRUE(res); ASSERT_EQ(res.value(), buckets.at(4)); ASSERT_EQ(scan_buckets.size(), 4); @@ -182,9 +131,11 @@ TEST_F(StorageBackendTest, BucketScan) { ASSERT_EQ(scan_buckets.at(i), buckets.at(i)); } - objects.clear(); + scan_keys.clear(); + scan_metadatas.clear(); scan_buckets.clear(); - res = storage_backend.BucketScan(buckets.at(4), objects, scan_buckets, 45); + res = storage_backend.BucketScan(buckets.at(4), scan_keys, scan_metadatas, + scan_buckets, 45); ASSERT_TRUE(res); ASSERT_EQ(res.value(), buckets.at(8)); ASSERT_EQ(scan_buckets.size(), 4); @@ -192,29 +143,35 @@ TEST_F(StorageBackendTest, BucketScan) { ASSERT_EQ(scan_buckets.at(i), buckets.at(i + 4)); } - objects.clear(); + scan_keys.clear(); + scan_metadatas.clear(); scan_buckets.clear(); - res = storage_backend.BucketScan(buckets.at(9), objects, scan_buckets, 45); + res = storage_backend.BucketScan(buckets.at(9), scan_keys, scan_metadatas, + scan_buckets, 45); ASSERT_TRUE(res); ASSERT_EQ(res.value(), 0); ASSERT_EQ(scan_buckets.size(), 1); ASSERT_EQ(scan_buckets.at(0), buckets.at(9)); - objects.clear(); + scan_keys.clear(); + scan_metadatas.clear(); scan_buckets.clear(); - res = storage_backend.BucketScan(buckets.at(9) + 10, objects, scan_buckets, - 45); + res = storage_backend.BucketScan(buckets.at(9) + 10, scan_keys, + scan_metadatas, scan_buckets, 45); ASSERT_TRUE(res); ASSERT_EQ(res.value(), 0); ASSERT_EQ(scan_buckets.size(), 0); - objects.clear(); + scan_keys.clear(); + scan_metadatas.clear(); scan_buckets.clear(); - res = storage_backend.BucketScan(0, objects, scan_buckets, 8); + res = storage_backend.BucketScan(0, scan_keys, scan_metadatas, scan_buckets, + 8); ASSERT_TRUE(!res); - ASSERT_EQ(res.error(), ErrorCode::KEYS_ULTRA_BUCKET_LIMIT); + ASSERT_EQ(res.error(), ErrorCode::KEYS_EXCEED_BUCKET_LIMIT); ASSERT_EQ(scan_buckets.size(), 0); - ASSERT_EQ(objects.size(), 0); + ASSERT_EQ(scan_keys.size(), 0); + ASSERT_EQ(scan_metadatas.size(), 0); } TEST_F(StorageBackendTest, InitializeWithValidStart) { diff --git a/mooncake-store/tests/utils/common.h b/mooncake-store/tests/utils/common.h new file mode 100644 index 000000000..d3484669a --- /dev/null +++ b/mooncake-store/tests/utils/common.h @@ -0,0 +1,59 @@ +#pragma once +#include "file_storage.h" +#include "storage_backend.h" +namespace mooncake { +namespace fs = std::filesystem; +inline tl::expected BatchOffloadUtil( + BucketStorageBackend& storage_backend, std::vector& keys, + std::vector& sizes, + std::unordered_map& batched_data, + std::vector& buckets) { + storage_backend.Init(); + std::shared_ptr client_buffer_allocator = + std::make_shared(128 * 1024 * 1024); + size_t bucket_sz = 10; + size_t batch_sz = 10; + size_t data_sz = 10; + for (size_t i = 0; i < bucket_sz; i++) { + std::unordered_map> batched_slices; + for (size_t j = 0; j < batch_sz; j++) { + std::string key = + "test_key_i_" + std::to_string(i) + "_j_" + std::to_string(j); + std::string all_data; + std::vector slices; + for (size_t k = 0; k < data_sz; k++) { + std::string data = "test_data_i_" + std::to_string(i) + "_j_" + + std::to_string(j) + "_k_" + + std::to_string(k); + all_data += data; + void* buffer = client_buffer_allocator->allocate(data.size()); + memcpy(buffer, data.data(), data.size()); + slices.emplace_back(Slice{buffer, data.size()}); + } + batched_slices.emplace(key, slices); + batched_data.emplace(key, all_data); + keys.emplace_back(key); + sizes.emplace_back(all_data.size()); + } + auto batch_store_object_one_result = storage_backend.BatchOffload( + batched_slices, + [&](const std::vector& keys, + const std::vector& metadatas) { + if (keys.size() != batched_slices.size()) { + return ErrorCode::INVALID_KEY; + } + for (const auto& key : keys) { + if (batched_slices.find(key) == batched_slices.end()) { + return ErrorCode::INVALID_KEY; + } + } + return ErrorCode::OK; + }); + if (!batch_store_object_one_result) { + return tl::make_unexpected(batch_store_object_one_result.error()); + } + buckets.emplace_back(batch_store_object_one_result.value()); + } + return {}; +} +} // namespace mooncake \ No newline at end of file