Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,59 @@ std::vector<std::vector<void *>> CastAddrs2Ptrs(
return all_buffers;
}
} // namespace
// Forward declaration for builder return type
class MooncakeStorePyWrapper;
// A lightweight Python-side builder that collects parameters and
// invokes the existing setup() to initialize the store.
class MooncakeStoreBuilderPy {
public:
// Defaults aligned with MooncakeDistributedStore.setup signature
std::string local_hostname_ = "";
std::string metadata_server_ = "";
size_t global_segment_size_ = 1024 * 1024 * 16; // 16MB
size_t local_buffer_size_ = 1024 * 1024 * 16; // 16MB
std::string protocol_ = "tcp";
std::string rdma_devices_ = "";
std::string master_server_addr_ = "127.0.0.1:50051";
py::object engine_ = py::none();

// Chainable setters
MooncakeStoreBuilderPy &local_hostname(const std::string &v) {
local_hostname_ = v;
return *this;
}
MooncakeStoreBuilderPy &metadata_server(const std::string &v) {
metadata_server_ = v;
return *this;
}
MooncakeStoreBuilderPy &global_segment_size(size_t v) {
global_segment_size_ = v;
return *this;
}
MooncakeStoreBuilderPy &local_buffer_size(size_t v) {
local_buffer_size_ = v;
return *this;
}
MooncakeStoreBuilderPy &protocol(const std::string &v) {
protocol_ = v;
return *this;
}
MooncakeStoreBuilderPy &rdma_devices(const std::string &v) {
rdma_devices_ = v;
return *this;
}
MooncakeStoreBuilderPy &master_server_addr(const std::string &v) {
master_server_addr_ = v;
return *this;
}
MooncakeStoreBuilderPy &engine(const py::object &obj) {
engine_ = obj;
return *this;
}

// Build and return a ready-to-use store (defined after wrapper type)
MooncakeStorePyWrapper build() const;
};
// Python-specific wrapper functions that handle GIL and return pybind11 types
class MooncakeStorePyWrapper {
public:
Expand Down Expand Up @@ -242,6 +295,33 @@ class MooncakeStorePyWrapper {
}
};

// Define builder::build() now that MooncakeStorePyWrapper is complete
inline MooncakeStorePyWrapper MooncakeStoreBuilderPy::build() const {
MooncakeStorePyWrapper wrapper;

// Convert optional engine
std::shared_ptr<TransferEngine> transfer_engine = nullptr;
if (!engine_.is_none()) {
transfer_engine = engine_.cast<std::shared_ptr<TransferEngine>>();
}

// Execute setup with GIL released
int rc = 0;
{
py::gil_scoped_release release_gil;
rc = wrapper.store_->setup(local_hostname_, metadata_server_,
global_segment_size_, local_buffer_size_,
protocol_, rdma_devices_,
master_server_addr_, transfer_engine);
}
if (rc != 0) {
throw std::runtime_error(
std::string("Mooncake store setup failed, error code: ") +
std::to_string(rc));
}
return wrapper;
}

PYBIND11_MODULE(store, m) {
// Define the ReplicateConfig class
py::class_<ReplicateConfig>(m, "ReplicateConfig")
Expand Down Expand Up @@ -299,6 +379,11 @@ PYBIND11_MODULE(store, m) {
// methods
py::class_<MooncakeStorePyWrapper>(m, "MooncakeDistributedStore")
.def(py::init<>())
// Provide a static builder() for convenience:
// MooncakeDistributedStore.builder()
.def_static(
"builder", []() { return MooncakeStoreBuilderPy(); },
"Create a new StoreBuilder to configure and build a store")
.def(
"setup",
[](MooncakeStorePyWrapper &self, const std::string &local_hostname,
Expand Down Expand Up @@ -610,6 +695,42 @@ PYBIND11_MODULE(store, m) {
"multiple "
"keys");

// Expose Python-side StoreBuilder
py::class_<MooncakeStoreBuilderPy>(m, "StoreBuilder")
.def(py::init<>())
.def("local_hostname", &MooncakeStoreBuilderPy::local_hostname,
py::return_value_policy::reference_internal,
"Set local hostname, e.g. 'host1' or 'host1:port'")
.def("metadata_server", &MooncakeStoreBuilderPy::metadata_server,
py::return_value_policy::reference_internal,
"Set metadata server connection string, e.g. '127.0.0.1:8080'")
.def("global_segment_size",
&MooncakeStoreBuilderPy::global_segment_size,
py::return_value_policy::reference_internal,
"Set total global segment size in bytes (for MountSegment)")
.def("local_buffer_size", &MooncakeStoreBuilderPy::local_buffer_size,
py::return_value_policy::reference_internal,
"Set size of local buffer allocator in bytes")
.def("protocol", &MooncakeStoreBuilderPy::protocol,
py::return_value_policy::reference_internal,
"Set transport protocol, e.g. 'tcp'|'rdma'|'ascend'")
.def("rdma_devices", &MooncakeStoreBuilderPy::rdma_devices,
py::return_value_policy::reference_internal,
"Set RDMA device names, e.g. 'mlx5_0' or ''")
.def("master_server_addr", &MooncakeStoreBuilderPy::master_server_addr,
py::return_value_policy::reference_internal,
"Set master server address, default '127.0.0.1:50051'")
.def("engine", &MooncakeStoreBuilderPy::engine,
py::return_value_policy::reference_internal,
"Set existing TransferEngine instance or None")
.def("build", &MooncakeStoreBuilderPy::build,
"Build and return a ready MooncakeDistributedStore instance");

// Module-level factory function: store.builder()
m.def(
"builder", []() { return MooncakeStoreBuilderPy(); },
"Create a new StoreBuilder to configure and build a store");

// Expose NUMA binding as a module-level function (no self required)
m.def(
"bind_to_numa_node",
Expand Down
139 changes: 139 additions & 0 deletions mooncake-store/tests/test_py_store_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
import sys
import time
import subprocess
import shutil
import unittest
from pathlib import Path

# Ensure we can import the built pybind module 'store'
# It is produced at build/mooncake-integration/store*.so
REPO_ROOT = Path(__file__).resolve().parents[2]
# Prefer system libstdc++ to avoid GLIBCXX mismatches with conda envs
os.environ["LD_LIBRARY_PATH"] = \
"/usr/lib/x86_64-linux-gnu:" + os.environ.get("LD_LIBRARY_PATH", "")
Comment on lines +13 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding the path /usr/lib/x86_64-linux-gnu for LD_LIBRARY_PATH may reduce the portability of this test, as this path is specific to Debian-based distributions like Ubuntu. Consider making this more robust, for example by searching for libstdc++.so.6 in common system library directories, or at least adding a comment to explain why this specific path is needed for the test environment.

BUILD_INTEGRATION_DIR = REPO_ROOT / "build" / "mooncake-integration"
if BUILD_INTEGRATION_DIR.exists():
sys.path.insert(0, str(BUILD_INTEGRATION_DIR))

STORE_IMPORT_ERROR = None
store = None
try:
import store # pybind module built from mooncake-integration
except Exception as e:
STORE_IMPORT_ERROR = (
f"Cannot import 'store' from {BUILD_INTEGRATION_DIR}: {e}. "
f"LD_LIBRARY_PATH={os.environ.get('LD_LIBRARY_PATH','')}")


@unittest.skipIf(STORE_IMPORT_ERROR is not None, STORE_IMPORT_ERROR)
class StoreBuilderE2ETest(unittest.TestCase):
master_proc = None
metadata_url = "http://127.0.0.1:8080/metadata"

@classmethod
def setUpClass(cls):
if shutil.which("mooncake_master") is None:
raise unittest.SkipTest("'mooncake_master' not found in PATH")
# Launch master with HTTP metadata server
# Use a small lease TTL to keep tests quick/consistent
cmd = [
"mooncake_master",
"--default_kv_lease_ttl=500",
"--enable_http_metadata_server=true",
]
cls.master_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
# Give master a moment to start
time.sleep(1.0)

@classmethod
def tearDownClass(cls):
if cls.master_proc is not None:
try:
cls.master_proc.terminate()
cls.master_proc.wait(timeout=5)
except Exception:
cls.master_proc.kill()

def _basic_kv_cycle(self, st):
key = "py_builder_test_key"
value = b"hello_builder"

# Put
rc = st.put(key, value)
self.assertEqual(rc, 0, f"put failed: rc={rc}")

# Existence
exist = st.is_exist(key)
self.assertEqual(exist, 1, f"is_exist unexpected: {exist}")

# Size
size = st.get_size(key)
self.assertEqual(size, len(value), f"get_size unexpected: {size}")

# Get
got = st.get(key)
self.assertIsInstance(got, (bytes, bytearray))
self.assertEqual(bytes(got), value)

# Remove
rc = st.remove(key)
self.assertEqual(rc, 0, f"remove failed: rc={rc}")

# Verify non-existence
exist = st.is_exist(key)
self.assertIn(exist, (0, -1)) # -1 may appear if metadata not yet visible

# Close
st.close()

def test_builder_with_all_parameters(self):
# Build store with all supported parameters explicitly specified
st = (
store.builder()
.local_hostname("localhost")
.metadata_server(self.metadata_url)
.global_segment_size(16 * 1024 * 1024)
.local_buffer_size(16 * 1024 * 1024)
.protocol("tcp")
.rdma_devices("")
.master_server_addr("127.0.0.1:50051")
.engine(None)
.build()
)

# Sanity check hostname
host = st.get_hostname()
self.assertTrue(isinstance(host, str) and len(host) > 0)

# Full KV cycle
self._basic_kv_cycle(st)

def test_builder_minimal_parameters(self):
# Only set the required ones; other defaults should apply
st = (
store.builder()
.local_hostname("localhost")
.metadata_server(self.metadata_url)
.build()
)
self._basic_kv_cycle(st)

def test_builder_zero_sizes(self):
# Zero global_segment_size and local_buffer_size should be supported
st = (
store.builder()
.local_hostname("localhost")
.metadata_server(self.metadata_url)
.global_segment_size(0)
.local_buffer_size(0)
.protocol("tcp")
.build()
)
self._basic_kv_cycle(st)


if __name__ == "__main__":
unittest.main(verbosity=2)
Loading