From 4f9a8672e50eca6ecfc00b995cb253130347ee8d Mon Sep 17 00:00:00 2001 From: yuyang Date: Wed, 5 Nov 2025 17:41:10 +0800 Subject: [PATCH 1/2] store builder python version --- mooncake-integration/store/store_py.cpp | 120 ++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 20c66ee00..8ee41b1ae 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -29,6 +29,59 @@ std::vector> CastAddrs2Ptrs( return all_buffers; } } // namespace +// Forward declaration for builder return type +class MooncakeStorePyWrapper; +// A lightweight Python-side builder that collects parameters and +// invokes the existing setup() to initialize the store. +class MooncakeStoreBuilderPy { + public: + // Defaults aligned with MooncakeDistributedStore.setup signature + std::string local_hostname_ = ""; + std::string metadata_server_ = ""; + size_t global_segment_size_ = 1024 * 1024 * 16; // 16MB + size_t local_buffer_size_ = 1024 * 1024 * 16; // 16MB + std::string protocol_ = "tcp"; + std::string rdma_devices_ = ""; + std::string master_server_addr_ = "127.0.0.1:50051"; + py::object engine_ = py::none(); + + // Chainable setters + MooncakeStoreBuilderPy &local_hostname(const std::string &v) { + local_hostname_ = v; + return *this; + } + MooncakeStoreBuilderPy &metadata_server(const std::string &v) { + metadata_server_ = v; + return *this; + } + MooncakeStoreBuilderPy &global_segment_size(size_t v) { + global_segment_size_ = v; + return *this; + } + MooncakeStoreBuilderPy &local_buffer_size(size_t v) { + local_buffer_size_ = v; + return *this; + } + MooncakeStoreBuilderPy &protocol(const std::string &v) { + protocol_ = v; + return *this; + } + MooncakeStoreBuilderPy &rdma_devices(const std::string &v) { + rdma_devices_ = v; + return *this; + } + MooncakeStoreBuilderPy &master_server_addr(const std::string &v) { + master_server_addr_ = v; + return *this; + } + MooncakeStoreBuilderPy &engine(const py::object &obj) { + engine_ = obj; + return *this; + } + + // Build and return a ready-to-use store (defined after wrapper type) + MooncakeStorePyWrapper build() const; +}; // Python-specific wrapper functions that handle GIL and return pybind11 types class MooncakeStorePyWrapper { public: @@ -242,6 +295,33 @@ class MooncakeStorePyWrapper { } }; +// Define builder::build() now that MooncakeStorePyWrapper is complete +inline MooncakeStorePyWrapper MooncakeStoreBuilderPy::build() const { + MooncakeStorePyWrapper wrapper; + + // Convert optional engine + std::shared_ptr transfer_engine = nullptr; + if (!engine_.is_none()) { + transfer_engine = engine_.cast>(); + } + + // Execute setup with GIL released + int rc = 0; + { + py::gil_scoped_release release_gil; + rc = wrapper.store_->setup( + local_hostname_, metadata_server_, global_segment_size_, + local_buffer_size_, protocol_, rdma_devices_, + master_server_addr_, transfer_engine); + } + if (rc != 0) { + throw std::runtime_error( + std::string("Mooncake store setup failed, error code: ") + + std::to_string(rc)); + } + return wrapper; +} + PYBIND11_MODULE(store, m) { // Define the ReplicateConfig class py::class_(m, "ReplicateConfig") @@ -299,6 +379,11 @@ PYBIND11_MODULE(store, m) { // methods py::class_(m, "MooncakeDistributedStore") .def(py::init<>()) + // Provide a static builder() for convenience: MooncakeDistributedStore.builder() + .def_static( + "builder", + []() { return MooncakeStoreBuilderPy(); }, + "Create a new StoreBuilder to configure and build a store") .def( "setup", [](MooncakeStorePyWrapper &self, const std::string &local_hostname, @@ -610,6 +695,41 @@ PYBIND11_MODULE(store, m) { "multiple " "keys"); + // Expose Python-side StoreBuilder + py::class_(m, "StoreBuilder") + .def(py::init<>()) + .def("local_hostname", &MooncakeStoreBuilderPy::local_hostname, + py::return_value_policy::reference_internal, + "Set local hostname, e.g. 'host1' or 'host1:port'") + .def("metadata_server", &MooncakeStoreBuilderPy::metadata_server, + py::return_value_policy::reference_internal, + "Set metadata server connection string, e.g. '127.0.0.1:8080'") + .def("global_segment_size", + &MooncakeStoreBuilderPy::global_segment_size, + py::return_value_policy::reference_internal, + "Set total global segment size in bytes (for MountSegment)") + .def("local_buffer_size", &MooncakeStoreBuilderPy::local_buffer_size, + py::return_value_policy::reference_internal, + "Set size of local buffer allocator in bytes") + .def("protocol", &MooncakeStoreBuilderPy::protocol, + py::return_value_policy::reference_internal, + "Set transport protocol, e.g. 'tcp'|'rdma'|'ascend'") + .def("rdma_devices", &MooncakeStoreBuilderPy::rdma_devices, + py::return_value_policy::reference_internal, + "Set RDMA device names, e.g. 'mlx5_0' or ''") + .def("master_server_addr", &MooncakeStoreBuilderPy::master_server_addr, + py::return_value_policy::reference_internal, + "Set master server address, default '127.0.0.1:50051'") + .def("engine", &MooncakeStoreBuilderPy::engine, + py::return_value_policy::reference_internal, + "Set existing TransferEngine instance or None") + .def("build", &MooncakeStoreBuilderPy::build, + "Build and return a ready MooncakeDistributedStore instance"); + + // Module-level factory function: store.builder() + m.def("builder", []() { return MooncakeStoreBuilderPy(); }, + "Create a new StoreBuilder to configure and build a store"); + // Expose NUMA binding as a module-level function (no self required) m.def( "bind_to_numa_node", From 1abf0a94d99ab84fa2ce20af7e4ed96638480401 Mon Sep 17 00:00:00 2001 From: yuyang Date: Wed, 5 Nov 2025 18:45:46 +0800 Subject: [PATCH 2/2] 1. add python builder for mooncake store 2. add tests --- mooncake-integration/store/store_py.cpp | 73 ++++----- mooncake-store/tests/test_py_store_builder.py | 139 ++++++++++++++++++ 2 files changed, 176 insertions(+), 36 deletions(-) create mode 100644 mooncake-store/tests/test_py_store_builder.py diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 8ee41b1ae..926d63cc5 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -309,10 +309,10 @@ inline MooncakeStorePyWrapper MooncakeStoreBuilderPy::build() const { int rc = 0; { py::gil_scoped_release release_gil; - rc = wrapper.store_->setup( - local_hostname_, metadata_server_, global_segment_size_, - local_buffer_size_, protocol_, rdma_devices_, - master_server_addr_, transfer_engine); + rc = wrapper.store_->setup(local_hostname_, metadata_server_, + global_segment_size_, local_buffer_size_, + protocol_, rdma_devices_, + master_server_addr_, transfer_engine); } if (rc != 0) { throw std::runtime_error( @@ -379,10 +379,10 @@ PYBIND11_MODULE(store, m) { // methods py::class_(m, "MooncakeDistributedStore") .def(py::init<>()) - // Provide a static builder() for convenience: MooncakeDistributedStore.builder() + // Provide a static builder() for convenience: + // MooncakeDistributedStore.builder() .def_static( - "builder", - []() { return MooncakeStoreBuilderPy(); }, + "builder", []() { return MooncakeStoreBuilderPy(); }, "Create a new StoreBuilder to configure and build a store") .def( "setup", @@ -697,37 +697,38 @@ PYBIND11_MODULE(store, m) { // Expose Python-side StoreBuilder py::class_(m, "StoreBuilder") - .def(py::init<>()) - .def("local_hostname", &MooncakeStoreBuilderPy::local_hostname, - py::return_value_policy::reference_internal, - "Set local hostname, e.g. 'host1' or 'host1:port'") - .def("metadata_server", &MooncakeStoreBuilderPy::metadata_server, - py::return_value_policy::reference_internal, - "Set metadata server connection string, e.g. '127.0.0.1:8080'") - .def("global_segment_size", - &MooncakeStoreBuilderPy::global_segment_size, - py::return_value_policy::reference_internal, - "Set total global segment size in bytes (for MountSegment)") - .def("local_buffer_size", &MooncakeStoreBuilderPy::local_buffer_size, - py::return_value_policy::reference_internal, - "Set size of local buffer allocator in bytes") - .def("protocol", &MooncakeStoreBuilderPy::protocol, - py::return_value_policy::reference_internal, - "Set transport protocol, e.g. 'tcp'|'rdma'|'ascend'") - .def("rdma_devices", &MooncakeStoreBuilderPy::rdma_devices, - py::return_value_policy::reference_internal, - "Set RDMA device names, e.g. 'mlx5_0' or ''") - .def("master_server_addr", &MooncakeStoreBuilderPy::master_server_addr, - py::return_value_policy::reference_internal, - "Set master server address, default '127.0.0.1:50051'") - .def("engine", &MooncakeStoreBuilderPy::engine, - py::return_value_policy::reference_internal, - "Set existing TransferEngine instance or None") - .def("build", &MooncakeStoreBuilderPy::build, - "Build and return a ready MooncakeDistributedStore instance"); + .def(py::init<>()) + .def("local_hostname", &MooncakeStoreBuilderPy::local_hostname, + py::return_value_policy::reference_internal, + "Set local hostname, e.g. 'host1' or 'host1:port'") + .def("metadata_server", &MooncakeStoreBuilderPy::metadata_server, + py::return_value_policy::reference_internal, + "Set metadata server connection string, e.g. '127.0.0.1:8080'") + .def("global_segment_size", + &MooncakeStoreBuilderPy::global_segment_size, + py::return_value_policy::reference_internal, + "Set total global segment size in bytes (for MountSegment)") + .def("local_buffer_size", &MooncakeStoreBuilderPy::local_buffer_size, + py::return_value_policy::reference_internal, + "Set size of local buffer allocator in bytes") + .def("protocol", &MooncakeStoreBuilderPy::protocol, + py::return_value_policy::reference_internal, + "Set transport protocol, e.g. 'tcp'|'rdma'|'ascend'") + .def("rdma_devices", &MooncakeStoreBuilderPy::rdma_devices, + py::return_value_policy::reference_internal, + "Set RDMA device names, e.g. 'mlx5_0' or ''") + .def("master_server_addr", &MooncakeStoreBuilderPy::master_server_addr, + py::return_value_policy::reference_internal, + "Set master server address, default '127.0.0.1:50051'") + .def("engine", &MooncakeStoreBuilderPy::engine, + py::return_value_policy::reference_internal, + "Set existing TransferEngine instance or None") + .def("build", &MooncakeStoreBuilderPy::build, + "Build and return a ready MooncakeDistributedStore instance"); // Module-level factory function: store.builder() - m.def("builder", []() { return MooncakeStoreBuilderPy(); }, + m.def( + "builder", []() { return MooncakeStoreBuilderPy(); }, "Create a new StoreBuilder to configure and build a store"); // Expose NUMA binding as a module-level function (no self required) diff --git a/mooncake-store/tests/test_py_store_builder.py b/mooncake-store/tests/test_py_store_builder.py new file mode 100644 index 000000000..6c18069b5 --- /dev/null +++ b/mooncake-store/tests/test_py_store_builder.py @@ -0,0 +1,139 @@ +import os +import sys +import time +import subprocess +import shutil +import unittest +from pathlib import Path + +# Ensure we can import the built pybind module 'store' +# It is produced at build/mooncake-integration/store*.so +REPO_ROOT = Path(__file__).resolve().parents[2] +# Prefer system libstdc++ to avoid GLIBCXX mismatches with conda envs +os.environ["LD_LIBRARY_PATH"] = \ + "/usr/lib/x86_64-linux-gnu:" + os.environ.get("LD_LIBRARY_PATH", "") +BUILD_INTEGRATION_DIR = REPO_ROOT / "build" / "mooncake-integration" +if BUILD_INTEGRATION_DIR.exists(): + sys.path.insert(0, str(BUILD_INTEGRATION_DIR)) + +STORE_IMPORT_ERROR = None +store = None +try: + import store # pybind module built from mooncake-integration +except Exception as e: + STORE_IMPORT_ERROR = ( + f"Cannot import 'store' from {BUILD_INTEGRATION_DIR}: {e}. " + f"LD_LIBRARY_PATH={os.environ.get('LD_LIBRARY_PATH','')}") + + +@unittest.skipIf(STORE_IMPORT_ERROR is not None, STORE_IMPORT_ERROR) +class StoreBuilderE2ETest(unittest.TestCase): + master_proc = None + metadata_url = "http://127.0.0.1:8080/metadata" + + @classmethod + def setUpClass(cls): + if shutil.which("mooncake_master") is None: + raise unittest.SkipTest("'mooncake_master' not found in PATH") + # Launch master with HTTP metadata server + # Use a small lease TTL to keep tests quick/consistent + cmd = [ + "mooncake_master", + "--default_kv_lease_ttl=500", + "--enable_http_metadata_server=true", + ] + cls.master_proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + # Give master a moment to start + time.sleep(1.0) + + @classmethod + def tearDownClass(cls): + if cls.master_proc is not None: + try: + cls.master_proc.terminate() + cls.master_proc.wait(timeout=5) + except Exception: + cls.master_proc.kill() + + def _basic_kv_cycle(self, st): + key = "py_builder_test_key" + value = b"hello_builder" + + # Put + rc = st.put(key, value) + self.assertEqual(rc, 0, f"put failed: rc={rc}") + + # Existence + exist = st.is_exist(key) + self.assertEqual(exist, 1, f"is_exist unexpected: {exist}") + + # Size + size = st.get_size(key) + self.assertEqual(size, len(value), f"get_size unexpected: {size}") + + # Get + got = st.get(key) + self.assertIsInstance(got, (bytes, bytearray)) + self.assertEqual(bytes(got), value) + + # Remove + rc = st.remove(key) + self.assertEqual(rc, 0, f"remove failed: rc={rc}") + + # Verify non-existence + exist = st.is_exist(key) + self.assertIn(exist, (0, -1)) # -1 may appear if metadata not yet visible + + # Close + st.close() + + def test_builder_with_all_parameters(self): + # Build store with all supported parameters explicitly specified + st = ( + store.builder() + .local_hostname("localhost") + .metadata_server(self.metadata_url) + .global_segment_size(16 * 1024 * 1024) + .local_buffer_size(16 * 1024 * 1024) + .protocol("tcp") + .rdma_devices("") + .master_server_addr("127.0.0.1:50051") + .engine(None) + .build() + ) + + # Sanity check hostname + host = st.get_hostname() + self.assertTrue(isinstance(host, str) and len(host) > 0) + + # Full KV cycle + self._basic_kv_cycle(st) + + def test_builder_minimal_parameters(self): + # Only set the required ones; other defaults should apply + st = ( + store.builder() + .local_hostname("localhost") + .metadata_server(self.metadata_url) + .build() + ) + self._basic_kv_cycle(st) + + def test_builder_zero_sizes(self): + # Zero global_segment_size and local_buffer_size should be supported + st = ( + store.builder() + .local_hostname("localhost") + .metadata_server(self.metadata_url) + .global_segment_size(0) + .local_buffer_size(0) + .protocol("tcp") + .build() + ) + self._basic_kv_cycle(st) + + +if __name__ == "__main__": + unittest.main(verbosity=2)