Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions mooncake-store/include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,61 @@ class Client {
std::vector<tl::expected<bool, ErrorCode>> BatchIsExist(
const std::vector<std::string>& keys);

/**
* @brief Mounts a file storage segment into the master.
* @param segment_name Unique identifier for the storage segment to mount.
* @param local_rpc_addr Local RPC address (e.g., "ip:port") where the
* segment will listen for requests.
* @param enable_offloading If true, enables offloading (write-to-file).
*/
tl::expected<void, ErrorCode> MountFileStorage(
const std::string& segment_name, const std::string& local_rpc_addr,
bool enable_offloading);

/**
* @brief Heartbeat call to collect object-level statistics and retrieve the
* set of non-offloaded objects.
* @param segment_name Name of the storage segment associated with this
* heartbeat.
* @param enable_offloading Indicates whether offloading is enabled for this
* segment.
* @param offloading_objects On return, contains a map from object key to
* size (in bytes) for all objects that require offload.
*/
tl::expected<void, ErrorCode> OffloadObjectHeartbeat(
const std::string& segment_name, bool enable_offloading,
std::unordered_map<std::string, int64_t>& offloading_objects);

/**
* @brief Performs a batched write of multiple objects using a
* high-throughput Transfer Engine.
* @param transfer_engine_addr Address of the Transfer Engine service (e.g.,
* "ip:port").
* @param keys List of keys identifying the data objects to be transferred
* @param pointers Array of destination memory addresses on the remote node
* where data will be written (one per key)
* @param batched_slices Map from object key to its data slice
* (`mooncake::Slice`), containing raw bytes to be written.
*/
tl::expected<void, ErrorCode> BatchPutOffloadObject(
const std::string& transfer_engine_addr,
const std::vector<std::string>& keys,
const std::vector<uintptr_t>& pointers,
const std::unordered_map<std::string, Slice>& batched_slices);

/**
* @brief Notifies the master that offloading of specified objects has
* succeeded.
* @param segment_name Name of the target storage segment to which objects
* will be added.
* @param keys List of object keys that were successfully offloaded
* @param metadatas Corresponding metadata for each key (e.g., size,
* hash, timestamp)
*/
tl::expected<void, ErrorCode> NotifyOffloadSuccess(
const std::string& segment_name, const std::vector<std::string>& keys,
const std::vector<StorageObjectMetadata>& metadatas);

// For human-readable metrics
tl::expected<std::string, ErrorCode> GetSummaryMetrics() {
if (metrics_ == nullptr) {
Expand Down
182 changes: 182 additions & 0 deletions mooncake-store/include/file_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#pragma once

#include "client.h"
#include "client_buffer.hpp"
#include "storage_backend.h"
#include "../tests/utils/common.h"

namespace mooncake {

struct FileStorageConfig {
// Path where data files are stored on disk
std::string storage_filepath = "/data/file_storage";

// Size of the local client-side buffer (used for caching or batching)
int64_t local_buffer_size = 1280 * 1024 * 1024; // ~1.2 GB

// Limits for scanning and iteration operations
int64_t bucket_iterator_keys_limit =
20000; // Max number of keys returned per Scan call
int64_t bucket_keys_limit =
500; // Max number of keys allowed in a single bucket
int64_t bucket_size_limit =
256 * 1024 * 1024; // Max total size of a single bucket (256 MB)

// Global limits across all buckets
int64_t total_keys_limit = 10'000'000; // Maximum total number of keys
int64_t total_size_limit =
2ULL * 1024 * 1024 * 1024 * 1024; // Maximum total storage size (2 TB)

// Interval between heartbeats sent to the control plane (in seconds)
uint32_t heartbeat_interval_seconds = 10;

// Validates the configuration for correctness and consistency
bool Validate() const;

/**
* @brief Creates a config instance by reading values from environment
* variables.
*
* Uses default values if environment variables are not set or invalid.
* This is a static factory method for easy configuration loading.
*
* @return FileStorageConfig with values from env or defaults
*/
static FileStorageConfig FromEnvironment();

static std::string GetEnvStringOr(const char* name,
const std::string& default_value);

template <typename T>
static T GetEnvOr(const char* name, T default_value);
};

class BucketIterator {
public:
BucketIterator(std::shared_ptr<BucketStorageBackend> storage_backend,
int64_t limit);

tl::expected<void, ErrorCode> HandleNext(
const std::function<
ErrorCode(const std::vector<std::string>& keys,
const std::vector<StorageObjectMetadata>& metadatas,
const std::vector<int64_t>& buckets)>& handler);

tl::expected<bool, ErrorCode> HasNext();

private:
std::shared_ptr<BucketStorageBackend> storage_backend_;
int64_t limit_;
mutable Mutex mutex_;
int64_t GUARDED_BY(mutex_) next_bucket_ = -1;
};

class FileStorage {
public:
FileStorage(std::shared_ptr<Client> client, const std::string& segment_name,
const std::string& local_rpc_addr,
const FileStorageConfig& config);
~FileStorage();

tl::expected<void, ErrorCode> Init();

/**
* @brief Reads multiple key-value (KV) entries from local storage and
* forwards them to a remote node.
* @param transfer_engine_addr Address of the remote transfer engine
* (format: "ip:port")
* @param keys List of keys to read from the local KV store
* @param pointers Array of remote memory base addresses (on the
* destination node) where each corresponding value will be written
* @param sizes Expected size in bytes for each value
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> BatchGet(
const std::string& transfer_engine_addr,
const std::vector<std::string>& keys,
const std::vector<uintptr_t>& pointers,
const std::vector<int64_t>& sizes);

private:
friend class FileStorageTest;
struct AllocatedBatch {
std::vector<BufferHandle> handles;
std::unordered_map<std::string, Slice> slices;

AllocatedBatch() = default;
AllocatedBatch(AllocatedBatch&&) = default;
AllocatedBatch& operator=(AllocatedBatch&&) = default;

AllocatedBatch(const AllocatedBatch&) = delete;
AllocatedBatch& operator=(const AllocatedBatch&) = delete;

~AllocatedBatch() = default;
};

/**
* @brief Offload object data and metadata.
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> OffloadObjects(
const std::unordered_map<std::string, int64_t>& offloading_objects);

/**
* @brief Groups offloading keys into buckets based on size and existence
* checks.
* @param offloading_objects Input map of object keys and their sizes
* (e.g., byte size).
* @param buckets_keys Output parameter: receives a 2D vector where:
* - Each outer element represents a bucket.
* - Each inner vector contains the newly allocated
* object keys within that bucket.
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> GroupOffloadingKeysByBucket(
const std::unordered_map<std::string, int64_t>& offloading_objects,
std::vector<std::vector<std::string>>& buckets_keys);

/**
* @brief Performs a heartbeat operation for the FileStorage component.
* 1. Sends object status (e.g., access frequency, size) to the master via
* client.
* 2. Receives feedback on which objects should be offloaded.
* 3. Triggers asynchronous offloading of pending objects.
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> Heartbeat();

tl::expected<bool, ErrorCode> IsEnableOffloading();

tl::expected<void, ErrorCode> BatchOffload(
const std::vector<std::string>& keys);

tl::expected<void, ErrorCode> BatchLoad(
const std::unordered_map<std::string, Slice>& batch_object);

tl::expected<void, ErrorCode> BatchQuerySegmentSlices(
const std::vector<std::string>& keys,
std::unordered_map<std::string, std::vector<Slice>>& batched_slices);

tl::expected<void, ErrorCode> RegisterLocalMemory();

tl::expected<AllocatedBatch, ErrorCode> AllocateBatch(
const std::vector<std::string>& keys,
const std::vector<int64_t>& sizes);

std::shared_ptr<Client> client_;
std::string segment_name_;
std::string local_rpc_addr_;
FileStorageConfig config_;
std::shared_ptr<BucketStorageBackend> storage_backend_;
std::shared_ptr<ClientBufferAllocator> client_buffer_allocator_;
BucketIterator sync_stat_bucket_iterator_;

mutable Mutex offloading_mutex_;
std::unordered_map<std::string, int64_t> GUARDED_BY(offloading_mutex_)
ungrouped_offloading_objects_;
bool GUARDED_BY(offloading_mutex_) enable_offloading_;
std::atomic<bool> heartbeat_running_;
std::thread heartbeat_thread_;
};

} // namespace mooncake
47 changes: 24 additions & 23 deletions mooncake-store/include/storage_backend.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
#pragma once

#include <glog/logging.h>

#include <filesystem>
#include <mutex>
#include <string>
#include <vector>
#include <filesystem>
#include "mutex.h"

#include "types.h"
#include "file_interface.h"
#include "mutex.h"
#include "types.h"

namespace mooncake {

struct StorageObjectMetadata {
int64_t bucket_id;
int64_t offset;
int64_t key_size;
int64_t data_size;
};

struct BucketObjectMetadata {
int64_t offset;
int64_t key_size;
Expand All @@ -29,14 +23,16 @@ YLT_REFL(BucketObjectMetadata, offset, key_size, data_size);
struct BucketMetadata {
int64_t meta_size;
int64_t data_size;
std::unordered_map<std::string, BucketObjectMetadata> object_metadata;
std::vector<std::string> keys;
std::vector<BucketObjectMetadata> metadatas;
};
YLT_REFL(BucketMetadata, data_size, object_metadata, keys);
YLT_REFL(BucketMetadata, data_size, keys, metadatas);

struct OffloadMetadata {
int64_t total_keys;
int64_t total_size;
OffloadMetadata(std::size_t keys, int64_t size)
: total_keys(keys), total_size(size) {}
};

enum class FileMode { Read, Write };
Expand Down Expand Up @@ -237,8 +233,9 @@ class BucketStorageBackend {
*/
tl::expected<int64_t, ErrorCode> BatchOffload(
const std::unordered_map<std::string, std::vector<Slice>>& batch_object,
std::function<ErrorCode(
const std::unordered_map<std::string, BucketObjectMetadata>&)>
std::function<
ErrorCode(const std::vector<std::string>& keys,
const std::vector<StorageObjectMetadata>& metadatas)>
complete_handler);

/**
Expand All @@ -261,7 +258,7 @@ class BucketStorageBackend {
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> BatchLoad(
std::unordered_map<std::string, Slice>& batched_slices);
const std::unordered_map<std::string, Slice>& batched_slices);

/**
* @brief Retrieves the list of object keys belonging to a specific bucket.
Expand Down Expand Up @@ -291,18 +288,19 @@ class BucketStorageBackend {
* @brief Iterate over the metadata of stored objects starting from a
* specified bucket.
* @param bucket_id The ID of the bucket to start scanning from.
* @param objects Output parameter: a map from object key to its metadata.
* @param buckets Output parameter: a list of bucket IDs encountered during
* iteration.
* @param limit Maximum number of objects to return in this iteration.
* @param keys Output vector to receive object keys
* @param metadatas Output vector to receive object metadata
* @param buckets Output vector to receive corresponding destination
* bucket IDs
* @param limit Maximum number of entries to return in this call
* @return tl::expected<int64_t, ErrorCode>
* - On success: the bucket ID where the next iteration should start (or 0
* if all data has been scanned).
* - On failure: returns an error code (e.g., BUCKET_NOT_FOUND, IO_ERROR).
*/
tl::expected<int64_t, ErrorCode> BucketScan(
int64_t bucket_id,
std::unordered_map<std::string, BucketObjectMetadata>& objects,
int64_t bucket_id, std::vector<std::string>& keys,
std::vector<StorageObjectMetadata>& metadatas,
std::vector<int64_t>& buckets, int64_t limit);

/**
Expand All @@ -314,8 +312,10 @@ class BucketStorageBackend {

private:
tl::expected<std::shared_ptr<BucketMetadata>, ErrorCode> BuildBucket(
int64_t bucket_id,
const std::unordered_map<std::string, std::vector<Slice>>& batch_object,
std::vector<iovec>& iovs);
std::vector<iovec>& iovs,
std::vector<StorageObjectMetadata>& metadatas);

tl::expected<void, ErrorCode> WriteBucket(
int64_t bucket_id, std::shared_ptr<BucketMetadata> bucket_metadata,
Expand All @@ -329,7 +329,8 @@ class BucketStorageBackend {

tl::expected<void, ErrorCode> BatchLoadBucket(
int64_t bucket_id, const std::vector<std::string>& keys,
std::unordered_map<std::string, Slice>& batched_slices);
const std::vector<StorageObjectMetadata>& metadatas,
const std::unordered_map<std::string, Slice>& batched_slices);

tl::expected<int64_t, ErrorCode> CreateBucketId();

Expand Down
11 changes: 10 additions & 1 deletion mooncake-store/include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ enum class ErrorCode : int32_t {

BUCKET_NOT_FOUND = -1200, ///< Bucket not found.
BUCKET_ALREADY_EXISTS = -1201, ///< Bucket already exists.
KEYS_ULTRA_BUCKET_LIMIT = -1202, ///< Keys ultra bucket limit.
KEYS_EXCEED_BUCKET_LIMIT = -1202, ///< Keys exceed bucket limit.
KEYS_ULTRA_LIMIT = -1203, ///< Keys ultra limit.
UNABLE_OFFLOAD = -1300, ///< The offload functionality is not enabled
};

Expand Down Expand Up @@ -240,4 +241,12 @@ inline std::ostream& operator<<(std::ostream& os,
return os;
}

struct StorageObjectMetadata {
int64_t bucket_id;
int64_t offset;
int64_t key_size;
int64_t data_size;
YLT_REFL(StorageObjectMetadata, bucket_id, offset, key_size, data_size);
};

} // namespace mooncake
1 change: 1 addition & 0 deletions mooncake-store/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set(MOONCAKE_STORE_SOURCES
client_buffer.cpp
pybind_client.cpp
http_metadata_server.cpp
file_storage.cpp
)

set(EXTRA_LIBS "")
Expand Down
Loading
Loading