diff --git a/BUILD.bazel b/BUILD.bazel index c7b30fc61892..88827674f349 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -339,6 +339,7 @@ cc_library( "src/ray/object_manager/plasma/plasma_generated.h", "src/ray/object_manager/plasma/protocol.h", "src/ray/object_manager/plasma/shared_memory.h", + "src/ray/object_manager/plasma/object_store_client_interface.h", ] + select({ "@bazel_tools//src/conditions:windows": [ ], @@ -388,6 +389,7 @@ cc_library( "src/ray/object_manager/plasma/stats_collector.h", "src/ray/object_manager/plasma/store.h", "src/ray/object_manager/plasma/store_runner.h", + "src/ray/object_manager/plasma/object_store_runner_interface.h", "src/ray/thirdparty/dlmalloc.c", ], copts = PLASMA_COPTS, @@ -821,14 +823,19 @@ cc_library( "src/ray/core_worker/store_provider/*.h", "src/ray/core_worker/store_provider/memory_store/*.h", "src/ray/core_worker/transport/*.h", - ]), + ]) + [ + "src/ray/object_manager/plugin_manager.h", + ] + glob([ + "src/ray/object_manager/plasma/*.h", + ]) + , copts = COPTS, strip_include_prefix = "src", visibility = ["//visibility:public"], deps = [ ":gcs", ":gcs_client_lib", - ":plasma_client", + ":plugin_manager", ":ray_common", ":ray_util", ":raylet_client_lib", @@ -2369,13 +2376,33 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) - +cc_library( + name = "plugin_manager", + srcs = [ + "src/ray/object_manager/plugin_manager.cc", + ], + hdrs = [ + "src/ray/object_manager/plugin_manager.h", + ], + copts = PLASMA_COPTS, + linkopts = PLASMA_LINKOPTS, + strip_include_prefix = "src", + deps = [ + ":plasma_client", + ":plasma_store_server_lib", + ], +) cc_library( name = "object_manager", - srcs = glob([ - "src/ray/object_manager/*.cc", - "src/ray/object_manager/notification/*.cc", - ]), + srcs = glob( + [ + "src/ray/object_manager/*.cc", + "src/ray/object_manager/notification/*.cc", + ], + exclude = [ + "src/ray/object_manager/plugin_manager.cc", + ], + ), hdrs = glob([ "src/ray/object_manager/*.h", "src/ray/object_manager/notification/*.h", @@ -2387,6 +2414,7 @@ cc_library( ":gcs", ":object_manager_rpc", ":plasma_store_server_lib", + ":plugin_manager", ":ray_common", ":ray_util", "@boost//:asio", @@ -3017,3 +3045,4 @@ genrule( """, local = 1, ) + diff --git a/cpp/BUILD.bazel b/cpp/BUILD.bazel index d21601878e47..b1a9bbace645 100644 --- a/cpp/BUILD.bazel +++ b/cpp/BUILD.bazel @@ -420,3 +420,4 @@ py_test( }, tags = ["team:core"], ) + diff --git a/cpp/src/ray/config_internal.cc b/cpp/src/ray/config_internal.cc index 8f98636b6f8f..736c60021f83 100644 --- a/cpp/src/ray/config_internal.cc +++ b/cpp/src/ray/config_internal.cc @@ -90,6 +90,21 @@ ABSL_FLAG(std::string, "The namespace of job. If not set," " a unique value will be randomly generated."); +ABSL_FLAG(std::string, + ray_plugin_name, + "default", + "The name of the object store plugin."); + +ABSL_FLAG(std::string, + ray_plugin_path, + "", + "The path to the object store plugin library."); + +ABSL_FLAG(std::string, + ray_plugin_params, + "{}", + "The paramters input of the object store plugin."); + using json = nlohmann::json; namespace ray { @@ -170,7 +185,17 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) { if (!FLAGS_ray_node_ip_address.CurrentValue().empty()) { node_ip_address = FLAGS_ray_node_ip_address.CurrentValue(); } + if (!FLAGS_ray_plugin_name.CurrentValue().empty()){ + plugin_name = FLAGS_ray_plugin_name.CurrentValue(); + } + if (!FLAGS_ray_plugin_path.CurrentValue().empty()){ + plugin_path = FLAGS_ray_plugin_path.CurrentValue(); + } + if (!FLAGS_ray_plugin_params.CurrentValue().empty()){ + plugin_params = FLAGS_ray_plugin_params.CurrentValue(); + } if (!FLAGS_ray_head_args.CurrentValue().empty()) { + RAY_LOG(INFO) << FLAGS_ray_head_args.CurrentValue(); std::vector args = absl::StrSplit(FLAGS_ray_head_args.CurrentValue(), ' ', absl::SkipEmpty()); head_args.insert(head_args.end(), args.begin(), args.end()); @@ -251,3 +276,4 @@ void ConfigInternal::UpdateSessionDir(const std::string dir) { } } // namespace internal } // namespace ray + diff --git a/cpp/src/ray/config_internal.h b/cpp/src/ray/config_internal.h index a73533417761..6d6b2f7be5ff 100644 --- a/cpp/src/ray/config_internal.h +++ b/cpp/src/ray/config_internal.h @@ -61,6 +61,12 @@ class ConfigInternal { std::vector head_args = {}; + std::string plugin_name = "default"; + + std::string plugin_path = ""; + + std::string plugin_params = "{}"; + boost::optional runtime_env; int runtime_env_hash = 0; @@ -94,3 +100,4 @@ class ConfigInternal { } // namespace internal } // namespace ray + diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index 26918e5b04f2..5a3fdb431148 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -91,7 +91,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) node_ip = GetNodeIpAddress(); } } - + std::unique_ptr global_state_accessor = CreateGlobalStateAccessor(bootstrap_address); if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER) { @@ -129,6 +129,9 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) options.job_id = global_state_accessor->GetNextJobID(); } } + options.plugin_name = ConfigInternal::Instance().plugin_name; + options.plugin_path = ConfigInternal::Instance().plugin_path; + options.plugin_params = ConfigInternal::Instance().plugin_params; options.gcs_options = gcs_options; options.enable_logging = true; options.log_dir = ConfigInternal::Instance().logs_dir; @@ -174,3 +177,4 @@ void ProcessHelper::RayStop() { } // namespace internal } // namespace ray + diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 5fb963e05cd9..0b40fa107bda 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -33,7 +33,6 @@ # using logging.basicConfig in its entry/init points. logger = logging.getLogger(__name__) - class Node: """An encapsulation of the Ray processes on a single node. @@ -285,6 +284,38 @@ def __init__( if head: self.start_head_processes() + if ray_params.plugin_name: + self._plugin_name = ray_params.plugin_name + os.environ[ray_constants.RAY_PLUGIN_NAME_ENVIRONMENT_VARIABLE] = ray_params.plugin_name + else: + env_plugin_name = os.environ.get(ray_constants.RAY_PLUGIN_NAME_ENVIRONMENT_VARIABLE) + if env_plugin_name is not None and env_plugin_name != "": + self._plugin_name = env_plugin_name + else: + self._plugin_name = "default" + + + if ray_params.plugin_path: + self._plugin_path = ray_params.plugin_path + os.environ[ray_constants.RAY_PLUGIN_PATH_ENVIRONMENT_VARIABLE] = ray_params.plugin_path + else: + env_plugin_path = os.environ.get(ray_constants.RAY_PLUGIN_PATH_ENVIRONMENT_VARIABLE) + if env_plugin_path is not None and env_plugin_path != "": + self._plugin_path = env_plugin_path + else: + self._plugin_path = "" + + + if ray_params.plugin_params: + self._plugin_params_str = json.dumps(ray_params.plugin_params) + os.environ[ray_constants.RAY_PLUGIN_PARAMS_ENVIRONMENT_VARIABLE_STR] = ray_params.plugin_params + else: + env_plugin_params_str = os.environ.get(ray_constants.RAY_PLUGIN_PARAMS_ENVIRONMENT_VARIABLE_STR) + if env_plugin_params_str is not None and env_plugin_params_str != "": + self._plugin_params_str = env_plugin_params_str + else: + self._plugin_params_str = "{}" + if not connect_only: self.start_ray_processes() # we should update the address info after the node has been started @@ -987,6 +1018,9 @@ def start_raylet( self, plasma_directory: str, object_store_memory: int, + plugin_name: str, + plugin_path: str, + plugin_params: str, use_valgrind: bool = False, use_profiler: bool = False, ): @@ -1016,6 +1050,9 @@ def start_raylet( self.get_resource_spec(), plasma_directory, object_store_memory, + plugin_name, + plugin_path, + plugin_params, self.session_name, is_head_node=self.is_head(), min_worker_port=self._ray_params.min_worker_port, @@ -1211,7 +1248,10 @@ def start_ray_processes(self): plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, ) - self.start_raylet(plasma_directory, object_store_memory) + + self.start_raylet(plasma_directory, object_store_memory, self._plugin_name, + self._plugin_path, + self._plugin_params_str) if self._ray_params.include_log_monitor: self.start_log_monitor() @@ -1580,3 +1620,4 @@ def _record_stats(self): "redis" if os.environ.get("RAY_REDIS_ADDRESS") is not None else "memory" ) record_extra_usage_tag(TagKey.GCS_STORAGE, gcs_storage_type) + diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index e89b9f9216d0..19326f6a8295 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -122,6 +122,9 @@ class RayParams: def __init__( self, + plugin_name: Optional[str] = None, + plugin_path: Optional[str] = None, + plugin_params: Optional[Dict[str, any]] = None, redis_address: Optional[str] = None, gcs_address: Optional[str] = None, num_cpus: Optional[int] = None, @@ -177,6 +180,9 @@ def __init__( session_name: Optional[str] = None, webui: Optional[str] = None, ): + self.plugin_name = plugin_name + self.plugin_path = plugin_path + self.plugin_params = plugin_params self.redis_address = redis_address self.gcs_address = gcs_address self.num_cpus = num_cpus @@ -438,3 +444,4 @@ def _format_ports(self, pre_selected_ports): port_range_str = f"from {min_port} to {max_port}" ports[comp] = f"{len(port_list)} ports {port_range_str}" return ports + diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d3e21046a693..bc81233bf932 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -1,4 +1,4 @@ -"""Ray constants used in the Python code.""" +f"""Ray constants used in the Python code.""" import logging import os @@ -425,3 +425,7 @@ def gcs_actor_scheduling_enabled(): } RAY_ENABLE_RECORD_TASK_LOGGING = env_bool("RAY_ENABLE_RECORD_TASK_LOGGING", False) + +RAY_PLUGIN_NAME_ENVIRONMENT_VARIABLE = "RAY_PLUGIN_NAME" +RAY_PLUGIN_PATH_ENVIRONMENT_VARIABLE = "RAY_PLUGIN_PATH" +RAY_PLUGIN_PARAMS_ENVIRONMENT_VARIABLE_STR = "RAY_PLUGIN_PARAMS_STR" diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 558762681f90..d67f4f192498 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1356,6 +1356,9 @@ def start_raylet( resource_spec, plasma_directory: str, object_store_memory: int, + plugin_name: str, + plugin_path: str, + plugin_params: str, session_name: str, is_head_node: bool, min_worker_port: Optional[int] = None, @@ -1602,12 +1605,14 @@ def start_raylet( f"--metrics-agent-port={metrics_agent_port}", f"--metrics_export_port={metrics_export_port}", f"--object_store_memory={object_store_memory}", + f"--plugin_name={plugin_name}", + f"--plugin_path={plugin_path}", + f"--plugin_params={plugin_params}", f"--plasma_directory={plasma_directory}", f"--ray-debugger-external={1 if ray_debugger_external else 0}", f"--gcs-address={gcs_address}", f"--session-name={session_name}", ] - if is_head_node: command.append("--head") @@ -1637,7 +1642,6 @@ def start_raylet( fate_share=fate_share, env_updates=env_updates, ) - return process_info @@ -2018,3 +2022,4 @@ def start_ray_client_server( fate_share=fate_share, ) return process_info + diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 4389510c7d21..3082d601c47d 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1118,6 +1118,9 @@ def init( num_cpus: Optional[int] = None, num_gpus: Optional[int] = None, resources: Optional[Dict[str, float]] = None, + plugin_name: Optional[str] = None, + plugin_path: Optional[str] = None, + plugin_params: Optional[Dict[str, Any]] = None, object_store_memory: Optional[int] = None, local_mode: bool = False, ignore_reinit_error: bool = False, @@ -1197,6 +1200,9 @@ def init( raylet. By default, this is set based on detected GPUs. resources: A dictionary mapping the names of custom resources to the quantities for them available. + plugin_name: The name of the object store plugin. + plugin_path: The path to the object store plugin shared library. + plugin_params: The parameters of the object store plugin. object_store_memory: The amount of memory (in bytes) to start the object store with. By default, this is automatically set based on available system memory. @@ -1496,6 +1502,9 @@ def init( dashboard_host=dashboard_host, dashboard_port=dashboard_port, memory=_memory, + plugin_name = plugin_name, + plugin_path = plugin_path, + plugin_params = plugin_params, object_store_memory=object_store_memory, redis_max_memory=_redis_max_memory, plasma_store_socket_name=None, @@ -1559,6 +1568,9 @@ def init( ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, raylet_ip_address=raylet_ip_address, + plugin_name = plugin_name, + plugin_path = plugin_path, + plugin_params = plugin_params, gcs_address=gcs_address, redis_address=redis_address, redis_password=_redis_password, @@ -1616,7 +1628,7 @@ def init( logger.info(info_str) connect( - _global_node, + _global_node, ## A node _global_node.session_name, mode=driver_mode, log_to_driver=log_to_driver, @@ -2177,6 +2189,8 @@ def connect( logs_dir = "" else: logs_dir = node.get_logs_dir_path() + + worker.core_worker = ray._raylet.CoreWorker( mode, node.plasma_store_socket_name, @@ -2191,6 +2205,9 @@ def connect( driver_name, log_stdout_file_path, log_stderr_file_path, + node._plugin_name, + node._plugin_path, + node._plugin_params_str, serialized_job_config, node.metrics_agent_port, runtime_env_hash, @@ -3231,3 +3248,4 @@ def remote( return _make_remote(args[0], {}) assert len(args) == 0 and len(kwargs) > 0, ray_option_utils.remote_args_error_string return functools.partial(_make_remote, options=kwargs) + diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 89a80aff8bef..f7e7ede8c4bb 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1733,6 +1733,7 @@ cdef class CoreWorker: JobID job_id, GcsClientOptions gcs_options, log_dir, node_ip_address, node_manager_port, raylet_ip_address, local_mode, driver_name, stdout_file, stderr_file, + plugin_name, plugin_path, plugin_params, serialized_job_config, metrics_agent_port, runtime_env_hash, startup_token, session_name, entrypoint, worker_launch_time_ms, worker_launched_time_ms): @@ -1769,6 +1770,9 @@ cdef class CoreWorker: options.driver_name = driver_name options.stdout_file = stdout_file options.stderr_file = stderr_file + options.plugin_name = plugin_name + options.plugin_path = plugin_path + options.plugin_params = plugin_params options.task_execution_callback = task_execution_handler options.check_signals = check_signals options.gc_collect = gc_collect @@ -3047,3 +3051,4 @@ cdef void async_callback(shared_ptr[CRayObject] obj, def del_key_from_storage(host, port, password, use_ssl, key): return RedisDelKeySync(host, port, password, use_ssl, key) + diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 15bec6c13913..c8ac0aa8fe0d 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -365,3 +365,4 @@ def shutdown(self): ray.experimental.internal_kv._internal_kv_reset() # Delete the cluster address. ray._private.utils.reset_ray_address() + diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 41e29f58012e..46f26940ed90 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -287,6 +287,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_string driver_name c_string stdout_file c_string stderr_file + c_string plugin_name + c_string plugin_path + c_string plugin_params (CRayStatus( const CAddress &caller_address, CTaskType task_type, @@ -352,3 +355,4 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: @staticmethod void RunTaskExecutionLoop() + diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 6cb85e53704d..a63c87c57abe 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -366,6 +366,24 @@ def debug(address): help="The amount of memory (in bytes) to start the object store with. " "By default, this is capped at 20GB but can be set higher.", ) +@click.option( + "--plugin-name", + required=False, + default=None, + help="The name of the plugin object store.", +) +@click.option( + "--plugin-path", + required=False, + default=None, + help="The full path of the library if the plugin is a shared library.", +) +@click.option( + "--plugin-params", + required=False, + default=None, + help="The objectstore startup parameters as a series of key-value pairs.", +) @click.option( "--redis-max-memory", required=False, @@ -542,6 +560,9 @@ def start( ray_client_server_port, memory, object_store_memory, + plugin_name, + plugin_path, + plugin_params, redis_max_memory, num_cpus, num_gpus, @@ -620,6 +641,9 @@ def start( object_manager_port=object_manager_port, node_manager_port=node_manager_port, memory=memory, + plugin_name=plugin_name, + plugin_path=plugin_path, + plugin_params=plugin_params, object_store_memory=object_store_memory, redis_password=redis_password, redirect_output=redirect_output, @@ -2450,3 +2474,4 @@ def main(): if __name__ == "__main__": main() + diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index 853c8102694e..2134be56c389 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -145,6 +145,9 @@ std::string TestSetupUtil::StartRaylet(const std::string &node_ip_address, "--node_ip_address=" + node_ip_address, "--min-worker-port=0", "--max-worker-port=0", + "--plugin-name=default", + "--plugin-path=", + "--plugin-params={}", "--maximum_startup_concurrency=10", "--static_resource_list=" + resource, "--python_worker_command=" + mock_worker_command, @@ -265,3 +268,4 @@ std::string TEST_RAYLET_EXEC_PATH; std::string TEST_MOCK_WORKER_EXEC_PATH; } // namespace ray + diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2b001719a72b..4f20fe7c93cd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -302,9 +302,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ } }; RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); - + plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, + options_.plugin_name, + options_.plugin_path, + options_.plugin_params, local_raylet_client_, reference_counter_, options_.check_signals, @@ -3954,3 +3957,4 @@ void ClusterSizeBasedLeaseRequestRateLimiter::OnNodeChanges( } // namespace core } // namespace ray + diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index 157a3fbc53a3..5e104151b135 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -71,6 +71,9 @@ struct CoreWorkerOptions { driver_name(""), stdout_file(""), stderr_file(""), + plugin_name(""), + plugin_path(""), + plugin_params(""), task_execution_callback(nullptr), check_signals(nullptr), gc_collect(nullptr), @@ -124,6 +127,12 @@ struct CoreWorkerOptions { std::string stdout_file; /// The stderr file of this process. std::string stderr_file; + /// The name of the plugin. + std::string plugin_name; + /// The path to the plugin shared library. + std::string plugin_path; + /// The parameters of the plugin. + std::string plugin_params; /// Language worker callback to execute tasks. TaskExecutionCallback task_execution_callback; /// The callback to be called when shutting down a `CoreWorker` instance. @@ -191,3 +200,4 @@ struct CoreWorkerOptions { }; } // namespace core } // namespace ray + diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index c48037ac904b..7d97a8610ef4 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -291,3 +291,4 @@ std::shared_ptr CoreWorkerProcessImpl::GetCoreWorker() const { } // namespace core } // namespace ray + diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 827678edc0f8..88d0075dbfd9 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -18,6 +18,7 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" #include "src/ray/protobuf/gcs.pb.h" +#include "ray/object_manager/plugin_manager.h" namespace ray { namespace core { @@ -55,6 +56,9 @@ BufferTracker::UsedObjects() const { CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( const std::string &store_socket, + const std::string &plugin_name, + const std::string &plugin_path, + const std::string &plugin_params, const std::shared_ptr raylet_client, const std::shared_ptr reference_counter, std::function check_signals, @@ -63,6 +67,11 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( : raylet_client_(raylet_client), reference_counter_(reference_counter), check_signals_(check_signals) { + + ray::PluginManager& plugin_manager = ray::PluginManager::GetInstance(); + plugin_manager.SetObjectStoreClients(plugin_name, plugin_path, plugin_params); + store_client_ = plugin_manager.CreateObjectStoreClientInstance(plugin_name); + if (get_current_call_site != nullptr) { get_current_call_site_ = get_current_call_site; } else { @@ -70,14 +79,14 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( } object_store_full_delay_ms_ = RayConfig::instance().object_store_full_delay_ms(); buffer_tracker_ = std::make_shared(); - RAY_CHECK_OK(store_client_.Connect(store_socket)); + RAY_CHECK_OK(store_client_->Connect(store_socket,"")); if (warmup) { RAY_CHECK_OK(WarmupStore()); } } CoreWorkerPlasmaStoreProvider::~CoreWorkerPlasmaStoreProvider() { - RAY_IGNORE_EXPR(store_client_.Disconnect()); + RAY_IGNORE_EXPR(store_client_->Disconnect()); } Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, @@ -96,7 +105,8 @@ Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, // not throw an error. if (data != nullptr) { if (object.HasData()) { - memcpy(data->Data(), object.GetData()->Data(), object.GetData()->Size()); + //memcpy(data->Data(), object.GetData()->Data(), object.GetData()->Size()); + store_client_->MemCpy(data->Data(), object.GetData()->Data(), object.GetData()->Size()); } RAY_RETURN_NOT_OK(Seal(object_id)); if (object_exists) { @@ -119,7 +129,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta source = plasma::flatbuf::ObjectSource::RestoredFromStorage; } Status status = - store_client_.CreateAndSpillIfNeeded(object_id, + store_client_->CreateAndSpillIfNeeded(object_id, owner_address, data_size, metadata ? metadata->Data() : nullptr, @@ -154,11 +164,11 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta } Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) { - return store_client_.Seal(object_id); + return store_client_->Seal(object_id); } Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) { - return store_client_.Release(object_id); + return store_client_->Release(object_id); } Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( @@ -179,7 +189,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( task_id)); std::vector plasma_results; - RAY_RETURN_NOT_OK(store_client_.Get(batch_ids, + RAY_RETURN_NOT_OK(store_client_->Get(batch_ids, timeout_ms, &plasma_results, /*is_from_worker=*/true)); @@ -220,7 +230,7 @@ Status CoreWorkerPlasmaStoreProvider::GetIfLocal( absl::flat_hash_map> *results) { std::vector plasma_results; // Since this path is used only for spilling, we should set is_from_worker: false. - RAY_RETURN_NOT_OK(store_client_.Get(object_ids, + RAY_RETURN_NOT_OK(store_client_->Get(object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false)); @@ -269,6 +279,14 @@ Status CoreWorkerPlasmaStoreProvider::Get( const WorkerContext &ctx, absl::flat_hash_map> *results, bool *got_exception) { + if (store_client_->IsGlobalShm()) { + std::vector obj_list; + for (const auto& id : object_ids) { + obj_list.emplace_back(id); + } + return GetIfLocal(obj_list, results); + } + int64_t batch_size = RayConfig::instance().worker_fetch_request_size(); std::vector batch_ids; absl::flat_hash_set remaining(object_ids.begin(), object_ids.end()); @@ -370,7 +388,7 @@ Status CoreWorkerPlasmaStoreProvider::Get( Status CoreWorkerPlasmaStoreProvider::Contains(const ObjectID &object_id, bool *has_object) { - return store_client_.Contains(object_id, has_object); + return store_client_->Contains(object_id, has_object); } Status CoreWorkerPlasmaStoreProvider::Wait( @@ -430,7 +448,7 @@ Status CoreWorkerPlasmaStoreProvider::Delete( } std::string CoreWorkerPlasmaStoreProvider::MemoryUsageString() { - return store_client_.DebugString(); + return store_client_->DebugString(); } absl::flat_hash_map> @@ -483,3 +501,5 @@ Status CoreWorkerPlasmaStoreProvider::WarmupStore() { } // namespace core } // namespace ray + + diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 4bcfad197b1f..179afccb0598 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -23,6 +23,7 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/reference_count.h" #include "ray/object_manager/plasma/client.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" #include "ray/raylet_client/raylet_client.h" namespace ray { @@ -89,6 +90,9 @@ class CoreWorkerPlasmaStoreProvider { public: CoreWorkerPlasmaStoreProvider( const std::string &store_socket, + const std::string &plugin_name, + const std::string &plugin_path, + const std::string &plugin_params, const std::shared_ptr raylet_client, const std::shared_ptr reference_counter, std::function check_signals, @@ -220,7 +224,8 @@ class CoreWorkerPlasmaStoreProvider { Status WarmupStore(); const std::shared_ptr raylet_client_; - plasma::PlasmaClient store_client_; + //plasma::PlasmaClient store_client_; + std::shared_ptr store_client_; /// Used to look up a plasma object's owner. const std::shared_ptr reference_counter_; std::function check_signals_; @@ -232,3 +237,4 @@ class CoreWorkerPlasmaStoreProvider { } // namespace core } // namespace ray + diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index cba46a14d733..a70c4b084067 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -137,6 +137,9 @@ class CoreWorkerTest : public ::testing::Test { options.enable_logging = true; options.install_failure_signal_handler = true; options.node_ip_address = "127.0.0.1"; + options.plugin_name = "default"; + options.plugin_path = ""; + options.plugin_params = "{}"; options.node_manager_port = node_manager_port; options.raylet_ip_address = "127.0.0.1"; options.driver_name = "core_worker_test"; @@ -1080,3 +1083,4 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } + diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index 1c782438ae28..c563337b83c1 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -44,6 +44,9 @@ class MockWorker { options.store_socket = store_socket; options.raylet_socket = raylet_socket; options.gcs_options = gcs_options; + options.plugin_name = "default"; + options.plugin_path = ""; + options.plugin_params = "{}"; options.enable_logging = true; options.install_failure_signal_handler = true; options.node_ip_address = "127.0.0.1"; @@ -189,3 +192,4 @@ int main(int argc, char **argv) { worker.RunTaskExecutionLoop(); return 0; } + diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 8004fb588811..2d29def00255 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -21,7 +21,7 @@ namespace ray { ObjectBufferPool::ObjectBufferPool( - std::shared_ptr store_client, uint64_t chunk_size) + std::shared_ptr store_client, uint64_t chunk_size) : store_client_(store_client), default_chunk_size_(chunk_size) {} ObjectBufferPool::~ObjectBufferPool() { @@ -136,7 +136,8 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, RAY_CHECK(data.size() == chunk_info.buffer_length) << "size mismatch! data size: " << data.size() << " chunk size: " << chunk_info.buffer_length; - std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length); + //std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length); + store_client_->MemCpy(chunk_info.data, data.data(), chunk_info.buffer_length); it->second.chunk_state.at(chunk_index) = CreateChunkState::SEALED; it->second.num_seals_remaining--; if (it->second.num_seals_remaining == 0) { @@ -301,3 +302,5 @@ std::string ObjectBufferPool::DebugString() const { } } // namespace ray + + diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index e514b2d72516..546d1bdc7243 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -27,7 +27,8 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/object_manager/memory_object_reader.h" -#include "ray/object_manager/plasma/client.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" +#include "ray/object_manager/plasma/object_store_runner_interface.h" namespace ray { @@ -60,7 +61,7 @@ class ObjectBufferPool { /// /// \param store_client Plasma store client. Used for testing purposes only. /// \param chunk_size The chunk size into which objects are to be split. - ObjectBufferPool(std::shared_ptr store_client, + ObjectBufferPool(std::shared_ptr store_client, const uint64_t chunk_size); ~ObjectBufferPool(); @@ -214,7 +215,7 @@ class ObjectBufferPool { GUARDED_BY(pool_mutex_); /// Plasma client pool. - std::shared_ptr store_client_; + std::shared_ptr store_client_; /// Determines the maximum chunk size to be transferred by a single thread. const uint64_t default_chunk_size_; @@ -223,3 +224,4 @@ class ObjectBufferPool { }; } // namespace ray + diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 18d3e76eb297..bd0acdd4ab5f 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -21,23 +21,24 @@ #include "ray/util/util.h" namespace asio = boost::asio; +using json = nlohmann::json; + namespace ray { ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config, + std::unique_ptr store_runner, SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback, AddObjectCallback add_object_callback, DeleteObjectCallback delete_object_callback) { - plasma::plasma_store_runner.reset( - new plasma::PlasmaStoreRunner(config.store_socket_name, - config.object_store_memory, - config.huge_pages, - config.plasma_directory, - config.fallback_directory)); + plasma::plasma_store_runner = std::move(store_runner); + // Initialize object store. - store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start, + std::map emptyMap; + store_thread_ = std::thread(&plasma::ObjectStoreRunnerInterface::Start, plasma::plasma_store_runner.get(), + std::ref(emptyMap), spill_objects_callback, object_store_full_callback, add_object_callback, @@ -70,8 +71,34 @@ ObjectManager::ObjectManager( self_node_id_(self_node_id), config_(config), object_directory_(object_directory), - object_store_internal_(std::make_unique( + rpc_work_(rpc_service_), + object_manager_server_("ObjectManager", + config_.object_manager_port, + config_.object_manager_address == "127.0.0.1", + config_.rpc_service_threads_number), + object_manager_service_(rpc_service_, *this), + client_call_manager_(main_service, config_.rpc_service_threads_number), + restore_spilled_object_(restore_spilled_object), + get_spilled_object_url_(get_spilled_object_url), + pull_retry_timer_(*main_service_, + boost::posix_time::milliseconds(config.timer_freq_ms)) { + RAY_CHECK(config_.rpc_service_threads_number > 0); + + PluginManager& plugin_manager = PluginManager::GetInstance(); + json params_map = json::parse(config.plugin_params); + if (config.plugin_name == "default"){ + params_map["plasma_directory"] = config.plasma_directory; + params_map["fallback_directory"] = config.fallback_directory; + params_map["object_store_memory"] = config.object_store_memory; + params_map["store_socket_name"] = config.store_socket_name; + params_map["huge_pages"] = config.huge_pages; + } + plugin_manager.SetObjectStores(config.plugin_name, + config.plugin_path, + params_map.dump()); + object_store_internal_ = std::make_unique( config, + std::move(plugin_manager.CreateObjectStoreRunnerInstance(config.plugin_name)), spill_objects_callback, object_store_full_callback, /*add_object_callback=*/ @@ -93,21 +120,11 @@ ObjectManager::ObjectManager( delete_object_callback(object_id); }, "ObjectManager.ObjectDeleted"); - })), - buffer_pool_store_client_(std::make_shared()), - buffer_pool_(buffer_pool_store_client_, config_.object_chunk_size), - rpc_work_(rpc_service_), - object_manager_server_("ObjectManager", - config_.object_manager_port, - config_.object_manager_address == "127.0.0.1", - config_.rpc_service_threads_number), - object_manager_service_(rpc_service_, *this), - client_call_manager_(main_service, config_.rpc_service_threads_number), - restore_spilled_object_(restore_spilled_object), - get_spilled_object_url_(get_spilled_object_url), - pull_retry_timer_(*main_service_, - boost::posix_time::milliseconds(config.timer_freq_ms)) { - RAY_CHECK(config_.rpc_service_threads_number > 0); + }); + buffer_pool_store_client_ = plugin_manager.CreateObjectStoreClientInstance(config.plugin_name); + //buffer_pool_store_client_ = plugin_manager.CreateObjectStoreClientInstance(""); + buffer_pool_ = std::make_shared(buffer_pool_store_client_, config_.object_chunk_size); + //RAY_LOG(INFO) << buffer_pool_store_client_->DebugString(); push_manager_.reset(new PushManager(/* max_chunks_in_flight= */ std::max( static_cast(1L), @@ -126,7 +143,7 @@ ObjectManager::ObjectManager( // We must abort this object because it may have only been partially // created and will cause a leak if we never receive the rest of the // object. This is a no-op if the object is already sealed or evicted. - buffer_pool_.AbortCreate(object_id); + buffer_pool_->AbortCreate(object_id); }; const auto &get_time = []() { return absl::GetCurrentTimeNanos() / 1e9; }; int64_t available_memory = config.object_store_memory; @@ -154,13 +171,20 @@ ObjectManager::ObjectManager( ObjectManager::~ObjectManager() { StopRpcService(); } +// std::shared_ptr ObjectManager::CreateObjectStoreClientInstance() { +// if(config_.plugin_name == "default"){ +// return std::make_shared(); +// } +// return plugin_manager_.client_creators_[config_.plugin_name](); +// }; + void ObjectManager::Stop() { plasma::plasma_store_runner->Stop(); object_store_internal_.reset(); } bool ObjectManager::IsPlasmaObjectSpillable(const ObjectID &object_id) { - return plasma::plasma_store_runner->IsPlasmaObjectSpillable(object_id); + return plasma::plasma_store_runner->IsObjectSpillable(object_id); } void ObjectManager::RunRpcService(int index) { @@ -390,7 +414,7 @@ void ObjectManager::PushLocalObject(const ObjectID &object_id, const NodeID &nod owner_address.set_worker_id(object_info.owner_worker_id.Binary()); std::pair, ray::Status> reader_status = - buffer_pool_.CreateObjectReader(object_id, owner_address); + buffer_pool_->CreateObjectReader(object_id, owner_address); Status status = reader_status.second; if (!status.ok()) { RAY_LOG_EVERY_N_OR_DEBUG(INFO, 100) @@ -602,7 +626,7 @@ bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id, // This object is no longer being actively pulled. Do not create the object. return false; } - auto chunk_status = buffer_pool_.CreateChunk( + auto chunk_status = buffer_pool_->CreateChunk( object_id, owner_address, data_size, metadata_size, chunk_index); if (!pull_manager_->IsObjectActive(object_id)) { num_chunks_received_cancelled_++; @@ -612,13 +636,13 @@ bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id, // the chunk. RAY_LOG(INFO) << "Aborting object creation because it is no longer actively pulled: " << object_id; - buffer_pool_.AbortCreate(object_id); + buffer_pool_->AbortCreate(object_id); return false; } if (chunk_status.ok()) { // Avoid handling this chunk if it's already being handled by another process. - buffer_pool_.WriteChunk(object_id, data_size, metadata_size, chunk_index, data); + buffer_pool_->WriteChunk(object_id, data_size, metadata_size, chunk_index, data); return true; } else { num_chunks_received_failed_due_to_plasma_++; @@ -656,7 +680,7 @@ void ObjectManager::HandleFreeObjects(rpc::FreeObjectsRequest request, void ObjectManager::FreeObjects(const std::vector &object_ids, bool local_only) { - buffer_pool_.FreeObjects(object_ids); + buffer_pool_->FreeObjects(object_ids); if (!local_only) { const auto remote_connections = object_directory_->LookupAllRemoteConnections(); std::vector> rpc_clients; @@ -732,7 +756,7 @@ std::string ObjectManager::DebugString() const { result << "\nEvent stats:" << rpc_service_.stats().StatsString(); result << "\n" << push_manager_->DebugString(); result << "\n" << object_directory_->DebugString(); - result << "\n" << buffer_pool_.DebugString(); + result << "\n" << buffer_pool_->DebugString(); result << "\n" << pull_manager_->DebugString(); return result.str(); } @@ -804,3 +828,5 @@ void ObjectManager::Tick(const boost::system::error_code &e) { } } // namespace ray + + diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 305cfc6f50be..4bb885004c74 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -25,7 +25,7 @@ #include #include #include - +#include #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/time/clock.h" @@ -45,6 +45,7 @@ #include "ray/rpc/object_manager/object_manager_server.h" #include "src/ray/protobuf/common.pb.h" #include "src/ray/protobuf/node_manager.pb.h" +#include "ray/object_manager/plugin_manager.h" namespace ray { @@ -76,6 +77,12 @@ struct ObjectManagerConfig { int rpc_service_threads_number; /// Initial memory allocation for store. int64_t object_store_memory = -1; + /// The name of the plugin. + std::string plugin_name; + /// The path to the plugin shared library. + std::string plugin_path; + /// The parameters of the plugin. + std::string plugin_params; /// The directory for shared memory files. std::string plasma_directory; /// The directory for fallback allocation files. @@ -91,6 +98,7 @@ struct LocalObjectInfo { class ObjectStoreRunner { public: ObjectStoreRunner(const ObjectManagerConfig &config, + std::unique_ptr store_runner, SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback, AddObjectCallback add_object_callback, @@ -255,6 +263,8 @@ class ObjectManager : public ObjectManagerInterface, bool PullManagerHasPullsQueued() const { return pull_manager_->HasPullsQueued(); } + //std::shared_ptr CreateObjectStoreClientInstance(); + private: friend class TestObjectManager; @@ -401,15 +411,18 @@ class ObjectManager : public ObjectManagerInterface, /// The object directory interface to access object information. IObjectDirectory *object_directory_; + //PluginManager &plugin_manager_; + /// Object store runner. std::unique_ptr object_store_internal_; /// Used by the buffer pool to read and write objects in the local store /// during object transfers. - std::shared_ptr buffer_pool_store_client_; + std::shared_ptr buffer_pool_store_client_; /// Manages accesses to local objects for object transfers. - ObjectBufferPool buffer_pool_; + //ObjectBufferPool buffer_pool_; + std::shared_ptr buffer_pool_; /// Multi-thread asio service, deal with all outgoing and incoming RPC request. instrumented_io_context rpc_service_; @@ -491,4 +504,7 @@ class ObjectManager : public ObjectManagerInterface, size_t num_chunks_received_failed_due_to_plasma_ = 0; }; +// extern PluginManager& plugin_manager_; + } // namespace ray + diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 5de3fcc20e7d..7ec3f542f0c9 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -165,6 +165,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; /// A mutex which protects this class. std::recursive_mutex client_mutex_; + /// A bool to indicate if global shared memory is used. + bool is_global_shm_ = false; }; PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); } @@ -853,6 +857,11 @@ bool PlasmaClient::IsInUse(const ObjectID &object_id) { return impl_->IsInUse(object_id); } +bool PlasmaClient::IsGlobalShm() { + return impl_->IsGlobalShm(); +} + int64_t PlasmaClient::store_capacity() { return impl_->store_capacity(); } } // namespace plasma + diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index d466528ecd27..b76e58ac75d9 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -17,133 +17,11 @@ #pragma once -#include -#include -#include -#include - -#include "ray/common/buffer.h" -#include "ray/common/status.h" -#include "ray/object_manager/plasma/common.h" -#include "ray/util/visibility.h" -#include "src/ray/protobuf/common.pb.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" namespace plasma { -using ray::Buffer; -using ray::SharedMemoryBuffer; -using ray::Status; - -/// Object buffer data structure. -struct ObjectBuffer { - /// The data buffer. - std::shared_ptr data; - /// The metadata buffer. - std::shared_ptr metadata; - /// The device number. - int device_num; -}; - -class PlasmaClientInterface { - public: - virtual ~PlasmaClientInterface(){}; - - /// Tell Plasma that the client no longer needs the object. This should be - /// called after Get() or Create() when the client is done with the object. - /// After this call, the buffer returned by Get() is no longer valid. - /// - /// \param object_id The ID of the object that is no longer needed. - /// \return The return status. - virtual Status Release(const ObjectID &object_id) = 0; - - /// Disconnect from the local plasma instance, including the local store and - /// manager. - /// - /// \return The return status. - virtual Status Disconnect() = 0; - - /// Get some objects from the Plasma Store. This function will block until the - /// objects have all been created and sealed in the Plasma Store or the - /// timeout expires. - /// - /// If an object was not retrieved, the corresponding metadata and data - /// fields in the ObjectBuffer structure will evaluate to false. - /// Objects are automatically released by the client when their buffers - /// get out of scope. - /// - /// \param object_ids The IDs of the objects to get. - /// \param timeout_ms The amount of time in milliseconds to wait before this - /// request times out. If this value is -1, then no timeout is set. - /// \param[out] object_buffers The object results. - /// \param is_from_worker Whether or not if the Get request comes from a Ray workers. - /// \return The return status. - virtual Status Get(const std::vector &object_ids, - int64_t timeout_ms, - std::vector *object_buffers, - bool is_from_worker) = 0; - - /// Seal an object in the object store. The object will be immutable after - /// this - /// call. - /// - /// \param object_id The ID of the object to seal. - /// \return The return status. - virtual Status Seal(const ObjectID &object_id) = 0; - - /// Abort an unsealed object in the object store. If the abort succeeds, then - /// it will be as if the object was never created at all. The unsealed object - /// must have only a single reference (the one that would have been removed by - /// calling Seal). - /// - /// \param object_id The ID of the object to abort. - /// \return The return status. - virtual Status Abort(const ObjectID &object_id) = 0; - - /// Create an object in the Plasma Store. Any metadata for this object must be - /// be passed in when the object is created. - /// - /// If this request cannot be fulfilled immediately, this call will block until - /// enough objects have been spilled to make space. If spilling cannot free - /// enough space, an out of memory error will be returned. - /// - /// \param object_id The ID to use for the newly created object. - /// \param owner_address The address of the object's owner. - /// \param data_size The size in bytes of the space to be allocated for this - /// object's - /// data (this does not include space used for metadata). - /// \param metadata The object's metadata. If there is no metadata, this - /// pointer should be NULL. - /// \param metadata_size The size in bytes of the metadata. If there is no - /// metadata, this should be 0. - /// \param data The address of the newly created object will be written here. - /// \param device_num The number of the device where the object is being - /// created. - /// device_num = 0 corresponds to the host, - /// device_num = 1 corresponds to GPU0, - /// device_num = 2 corresponds to GPU1, etc. - /// \return The return status. - /// - /// The returned object must be released once it is done with. It must also - /// be either sealed or aborted. - virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id, - const ray::rpc::Address &owner_address, - int64_t data_size, - const uint8_t *metadata, - int64_t metadata_size, - std::shared_ptr *data, - plasma::flatbuf::ObjectSource source, - int device_num = 0) = 0; - - /// Delete a list of objects from the object store. This currently assumes that the - /// object is present, has been sealed and not used by another client. Otherwise, - /// it is a no operation. - /// - /// \param object_ids The list of IDs of the objects to delete. - /// \return The return status. If all the objects are non-existent, return OK. - virtual Status Delete(const std::vector &object_ids) = 0; -}; - -class PlasmaClient : public PlasmaClientInterface { +class PlasmaClient : public ObjectStoreClientInterface { public: PlasmaClient(); ~PlasmaClient(); @@ -163,7 +41,7 @@ class PlasmaClient : public PlasmaClientInterface { Status Connect(const std::string &store_socket_name, const std::string &manager_socket_name = "", int release_delay = 0, - int num_retries = -1); + int num_retries = -1) override; /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. @@ -198,7 +76,7 @@ class PlasmaClient : public PlasmaClientInterface { int64_t metadata_size, std::shared_ptr *data, plasma::flatbuf::ObjectSource source, - int device_num = 0); + int device_num = 0) override; /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. @@ -233,7 +111,7 @@ class PlasmaClient : public PlasmaClientInterface { int64_t metadata_size, std::shared_ptr *data, plasma::flatbuf::ObjectSource source, - int device_num = 0); + int device_num = 0) override; /// Get some objects from the Plasma Store. This function will block until the /// objects have all been created and sealed in the Plasma Store or the @@ -253,7 +131,7 @@ class PlasmaClient : public PlasmaClientInterface { Status Get(const std::vector &object_ids, int64_t timeout_ms, std::vector *object_buffers, - bool is_from_worker); + bool is_from_worker) override; /// Tell Plasma that the client no longer needs the object. This should be /// called after Get() or Create() when the client is done with the object. @@ -261,7 +139,7 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_id The ID of the object that is no longer needed. /// \return The return status. - Status Release(const ObjectID &object_id); + Status Release(const ObjectID &object_id) override; /// Check if the object store contains a particular object and the object has /// been sealed. The result will be stored in has_object. @@ -273,7 +151,7 @@ class PlasmaClient : public PlasmaClientInterface { /// \param has_object The function will write true at this address if /// the object is present and false if it is not present. /// \return The return status. - Status Contains(const ObjectID &object_id, bool *has_object); + Status Contains(const ObjectID &object_id, bool *has_object) override; /// Abort an unsealed object in the object store. If the abort succeeds, then /// it will be as if the object was never created at all. The unsealed object @@ -282,15 +160,15 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_id The ID of the object to abort. /// \return The return status. - Status Abort(const ObjectID &object_id); + Status Abort(const ObjectID &object_id) override; - /// Seal an object in the object store. The object will be immutable after +/// Seal an object in the object store. The object will be immutable after /// this /// call. /// /// \param object_id The ID of the object to seal. /// \return The return status. - Status Seal(const ObjectID &object_id); + Status Seal(const ObjectID &object_id) override; /// Delete an object from the object store. This currently assumes that the /// object is present, has been sealed and not used by another client. Otherwise, @@ -309,7 +187,7 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_ids The list of IDs of the objects to delete. /// \return The return status. If all the objects are non-existent, return OK. - Status Delete(const std::vector &object_ids); + Status Delete(const std::vector &object_ids) override; /// Delete objects until we have freed up num_bytes bytes or there are no more /// released objects that can be deleted. @@ -318,23 +196,41 @@ class PlasmaClient : public PlasmaClientInterface { /// \param num_bytes_evicted Out parameter for total number of bytes of space /// retrieved. /// \return The return status. - Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted); + Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted) override; /// Disconnect from the local plasma instance, including the local store and /// manager. /// /// \return The return status. - Status Disconnect(); + Status Disconnect() override; /// Get the current debug string from the plasma store server. /// /// \return The debug string. - std::string DebugString(); + std::string DebugString() override; /// Get the memory capacity of the store. /// /// \return Memory capacity of the store in bytes. - int64_t store_capacity(); + int64_t store_capacity() override; + + /// Get if global shared memory is used. + /// + /// \return True if global shared memory is used, other then false. + bool IsGlobalShm() override; + + Status Authenticate(const std::string& user, + const std::string& passwd) override { + return Status::OK(); + }; + + Status Authenticate(const std::string& secret) override { + return Status::OK(); + }; + + void MemCpy(void* dest, const void* src, size_t len) override { + memcpy(dest, src, len); + }; private: /// Retry a previous create call using the returned request ID. @@ -360,4 +256,5 @@ class PlasmaClient : public PlasmaClientInterface { std::shared_ptr impl_; }; -} // namespace plasma +} // namespace plasma + diff --git a/src/ray/object_manager/plasma/object_store_client_interface.h b/src/ray/object_manager/plasma/object_store_client_interface.h new file mode 100644 index 000000000000..f8bf9d3654a6 --- /dev/null +++ b/src/ray/object_manager/plasma/object_store_client_interface.h @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ray/common/buffer.h" +#include "ray/common/status.h" +#include "ray/object_manager/plasma/common.h" +#include "ray/util/visibility.h" +#include "src/ray/protobuf/common.pb.h" + +namespace plasma { + +using ray::Buffer; +using ray::SharedMemoryBuffer; +using ray::Status; + +/// Object buffer data structure. +struct ObjectBuffer { + /// The data buffer. + std::shared_ptr data; + /// The metadata buffer. + std::shared_ptr metadata; + /// The device number. + int device_num; +}; + +class ObjectStoreClientInterface : public std::enable_shared_from_this { + public: + virtual ~ObjectStoreClientInterface(){}; + + /// Tell Store that the client no longer needs the object. This should be + /// called after Get() or Create() when the client is done with the object. + /// After this call, the buffer returned by Get() is no longer valid. + /// + /// \param object_id The ID of the object that is no longer needed. + /// \return The return status. + virtual Status Release(const ObjectID &object_id) = 0; + + /// Disconnect from the Store. + /// + /// \return The return status. + virtual Status Disconnect() = 0; + + /// Get some objects from the Store. This function will block until the + /// objects have all been created and sealed in the Plasma Store or the + /// timeout expires. + /// + /// If an object was not retrieved, the corresponding metadata and data + /// fields in the ObjectBuffer structure will evaluate to false. + /// Objects are automatically released by the client when their buffers + /// get out of scope. + /// + /// \param object_ids The IDs of the objects to get. + /// \param timeout_ms The amount of time in milliseconds to wait before this + /// request times out. If this value is -1, then no timeout is set. + /// \param[out] object_buffers The object results. + /// \param is_from_worker Whether or not if the Get request comes from a Ray workers. + /// \return The return status. + virtual Status Get(const std::vector &object_ids, + int64_t timeout_ms, + std::vector *object_buffers, + bool is_from_worker) = 0; + + /// Seal an object in the object store. The object will be immutable after + /// this + /// call. + /// + /// \param object_id The ID of the object to seal. + /// \return The return status. + virtual Status Seal(const ObjectID &object_id) = 0; + + /// Abort an unsealed object in the object store. If the abort succeeds, then + /// it will be as if the object was never created at all. The unsealed object + /// must have only a single reference (the one that would have been removed by + /// calling Seal). + /// + /// \param object_id The ID of the object to abort. + /// \return The return status. + virtual Status Abort(const ObjectID &object_id) = 0; + + /// Create an object in the Plasma Store. Any metadata for this object must be + /// be passed in when the object is created. + /// + /// If this request cannot be fulfilled immediately, this call will block until + /// enough objects have been spilled to make space. If spilling cannot free + /// enough space, an out of memory error will be returned. + /// + /// \param object_id The ID to use for the newly created object. + /// \param owner_address The address of the object's owner. + /// \param data_size The size in bytes of the space to be allocated for this + /// object's + /// data (this does not include space used for metadata). + /// \param metadata The object's metadata. If there is no metadata, this + /// pointer should be NULL. + /// \param metadata_size The size in bytes of the metadata. If there is no + /// metadata, this should be 0. + /// \param data The address of the newly created object will be written here. + /// \param device_num The number of the device where the object is being + /// created. + /// device_num = 0 corresponds to the host, + /// device_num = 1 corresponds to GPU0, + /// device_num = 2 corresponds to GPU1, etc. + /// \return The return status. + /// + /// The returned object must be released once it is done with. It must also + /// be either sealed or aborted. + virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + std::shared_ptr *data, + plasma::flatbuf::ObjectSource source, + int device_num = 0) = 0; + + /// Delete a list of objects from the object store. This currently assumes that the + /// object is present, has been sealed and not used by another client. Otherwise, + /// it is a no operation. + /// + /// \param object_ids The list of IDs of the objects to delete. + /// \return The return status. If all the objects are non-existent, return OK. + virtual Status Delete(const std::vector &object_ids) = 0; + + /// Connect to the store. Return the resulting connection. + /// + /// \param store_socket_name The name of the UNIX domain socket to use to + /// connect to the store. + /// \param manager_socket_name The name of the UNIX domain socket to use to + /// connect to the manager. If this is "", then this + /// function will not connect to a manager. + /// Note that manager is no longer supported, this function + /// will return failure if this is not "". + /// \param release_delay Deprecated (not used). + /// \param num_retries number of attempts to connect to IPC socket, default 50 + /// \return The return status. + virtual Status Connect(const std::string &store_socket_name, + const std::string &manager_socket_name, + int release_delay = 0, + int num_retries = -1) = 0; + + /// Authentication to the object store. + /// \param user user name. + /// \param passwd password + /// \return The return status. + virtual Status Authenticate(const std::string& user, + const std::string& passwd) = 0; + + /// Authentication to the object store. + /// \param secret The user secret. + /// \return The return status. + virtual Status Authenticate(const std::string& secret) = 0; + + /// The object storage optimized memory copy. + /// \param dest The destination address. + /// \param src The source address. + /// \param len Number of bytes to copy. + virtual void MemCpy(void* dest, + const void* src, + size_t len) = 0; + + /// Create an object in the Plasma Store. Any metadata for this object must be + /// be passed in when the object is created. + /// + /// The plasma store will attempt to fulfill this request immediately. If it + /// cannot be fulfilled immediately, an error will be returned to the client. + /// + /// \param object_id The ID to use for the newly created object. + /// \param owner_address The address of the object's owner. + /// \param data_size The size in bytes of the space to be allocated for this + /// object's + /// data (this does not include space used for metadata). + /// \param metadata The object's metadata. If there is no metadata, this + /// pointer + /// should be NULL. + /// \param metadata_size The size in bytes of the metadata. If there is no + /// metadata, this should be 0. + /// \param data The address of the newly created object will be written here. + /// \param device_num The number of the device where the object is being + /// created. + /// device_num = 0 corresponds to the host, + /// device_num = 1 corresponds to GPU0, + /// device_num = 2 corresponds to GPU1, etc. + /// \return The return status. + /// + /// The returned object must be released once it is done with. It must also + /// be either sealed or aborted. + virtual Status TryCreateImmediately(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + std::shared_ptr *data, + plasma::flatbuf::ObjectSource source, + int device_num = 0) = 0; + + /// Check if the object store contains a particular object and the object has + /// been sealed. The result will be stored in has_object. + /// + /// @todo: We may want to indicate if the object has been created but not + /// sealed. + /// + /// \param object_id The ID of the object whose presence we are checking. + /// \param has_object The function will write true at this address if + /// the object is present and false if it is not present. + /// \return The return status. + virtual Status Contains(const ObjectID &object_id, bool *has_object) = 0; + + /// Delete objects until we have freed up num_bytes bytes or there are no more + /// released objects that can be deleted. + /// + /// \param num_bytes The number of bytes to try to free up. + /// \param num_bytes_evicted Out parameter for total number of bytes of space + /// retrieved. + /// \return The return status. + virtual Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted) = 0; + + /// Get the current debug string from the plasma store server. + /// + /// \return The debug string. + virtual std::string DebugString() = 0; + + /// Get the memory capacity of the store. + /// + /// \return Memory capacity of the store in bytes. + virtual int64_t store_capacity() = 0; + + /// Get if global shared memory is used. + /// + /// \return True if global shared memory is used, other then false. + virtual bool IsGlobalShm() = 0; + + +}; + +} // namespace plasma + diff --git a/src/ray/object_manager/plasma/object_store_runner_interface.h b/src/ray/object_manager/plasma/object_store_runner_interface.h new file mode 100644 index 000000000000..40a56a8ce6cc --- /dev/null +++ b/src/ray/object_manager/plasma/object_store_runner_interface.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ray/common/buffer.h" +#include "ray/common/status.h" +#include "ray/object_manager/plasma/common.h" +#include "ray/util/visibility.h" +#include "src/ray/protobuf/common.pb.h" + +namespace plasma { + +class ObjectStoreRunnerInterface : public std::enable_shared_from_this { + public: + virtual ~ObjectStoreRunnerInterface(){}; + + /// Start the object store runner. The params are a series of key-value. + virtual void Start(const std::map& params, + ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback, + ray::AddObjectCallback add_object_callback, + ray::DeleteObjectCallback delete_object_callback) = 0; + + // Stop the object store + virtual void Stop() = 0; + + virtual bool IsObjectSpillable(const ObjectID &object_id) = 0; + + virtual int64_t GetConsumedBytes() = 0; + + virtual int64_t GetFallbackAllocated() const = 0; + + virtual void GetAvailableMemoryAsync(std::function callback) const = 0; + + // The Current total allocated available memory size + virtual int64_t GetTotalMemorySize() const = 0; + + // Maximal available memory size. It can be different from the Total memory + // size if the memory is dynamically expandable. + virtual int64_t GetMaxMemorySize() const = 0; +}; + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index ca1f8a71e455..b4d457df5547 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -80,7 +80,8 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, fallback_directory_ = fallback_directory; } -void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback, +void PlasmaStoreRunner::Start(const std::map& params, + ray::SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback, ray::AddObjectCallback add_object_callback, ray::DeleteObjectCallback delete_object_callback) { @@ -137,7 +138,7 @@ void PlasmaStoreRunner::Shutdown() { } } -bool PlasmaStoreRunner::IsPlasmaObjectSpillable(const ObjectID &object_id) { +bool PlasmaStoreRunner::IsObjectSpillable(const ObjectID &object_id) { return store_->IsObjectSpillable(object_id); } @@ -148,6 +149,7 @@ int64_t PlasmaStoreRunner::GetFallbackAllocated() const { return allocator_ ? allocator_->FallbackAllocated() : 0; } -std::unique_ptr plasma_store_runner; - +//std::unique_ptr plasma_store_runner; +std::unique_ptr plasma_store_runner; } // namespace plasma + diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 22e31b014684..2824c77f3a15 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -8,32 +8,38 @@ #include "ray/common/file_system_monitor.h" #include "ray/object_manager/plasma/plasma_allocator.h" #include "ray/object_manager/plasma/store.h" +#include "ray/object_manager/plasma/object_store_runner_interface.h" namespace plasma { -class PlasmaStoreRunner { +class PlasmaStoreRunner : public ObjectStoreRunnerInterface { public: PlasmaStoreRunner(std::string socket_name, int64_t system_memory, bool hugepages_enabled, std::string plasma_directory, std::string fallback_directory); - void Start(ray::SpillObjectsCallback spill_objects_callback, - std::function object_store_full_callback, - ray::AddObjectCallback add_object_callback, - ray::DeleteObjectCallback delete_object_callback); - void Stop(); + void Start(const std::map& params, + ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback, + ray::AddObjectCallback add_object_callback, + ray::DeleteObjectCallback delete_object_callback) override; + void Stop() override; + + bool IsObjectSpillable(const ObjectID &object_id) override; - bool IsPlasmaObjectSpillable(const ObjectID &object_id); + int64_t GetConsumedBytes() override; - int64_t GetConsumedBytes(); - int64_t GetFallbackAllocated() const; + int64_t GetFallbackAllocated() const override; - void GetAvailableMemoryAsync(std::function callback) const { + void GetAvailableMemoryAsync(std::function callback) const override { main_service_.post([this, callback]() { store_->GetAvailableMemory(callback); }, - "PlasmaStoreRunner.GetAvailableMemory"); + "PlasmaStoreRunner.GetAvailableMemory"); } + int64_t GetTotalMemorySize() const override { return system_memory_; }; + int64_t GetMaxMemorySize() const override { return system_memory_; }; + private: void Shutdown(); mutable absl::Mutex store_runner_mutex_; @@ -53,6 +59,7 @@ class PlasmaStoreRunner { // 2) The thirdparty dlmalloc library cannot be contained in a local variable, // so even we use a local variable for plasma store, it does not provide // better isolation. -extern std::unique_ptr plasma_store_runner; +//extern std::unique_ptr plasma_store_runner; +extern std::unique_ptr plasma_store_runner; -} // namespace plasma +} // namespace plasma diff --git a/src/ray/object_manager/plugin_manager.cc b/src/ray/object_manager/plugin_manager.cc new file mode 100644 index 000000000000..a9906fb62fd2 --- /dev/null +++ b/src/ray/object_manager/plugin_manager.cc @@ -0,0 +1,169 @@ +#include "ray/object_manager/plugin_manager.h" + +#include "nlohmann/json.hpp" + +using json = nlohmann::json; + +namespace ray { + +using DefaultClientCreator = std::shared_ptr (*)(); +using DefaultRunnerCreator = std::unique_ptr (*)( + std::string, + int64_t, + bool, + std::string, + std::string); +using PluginRunnerCreator = std::unique_ptr (*)(); +using PluginClientCreator = std::shared_ptr (*)(); + +PluginManager &PluginManager::GetInstance() { + static PluginManager instance; + return instance; +} + +/// A creator function of default object store client (PlasmaClient). +std::shared_ptr CreateDefaultClientInstance() { + return std::make_shared(); +} + +/// A creator function of default object store runner (PlasmaStoreRunner). +std::unique_ptr CreateDefaultRunnerInstance( + std::string store_socket_name, + int64_t object_store_memory, + bool huge_pages, + std::string plasma_directory, + std::string fallback_directory) { + return std::make_unique(store_socket_name, + object_store_memory, + huge_pages, + plasma_directory, + fallback_directory); +} + +void PluginManager::LoadObjectStorePlugin() { + void *handle = dlopen(plugin_path_.c_str(), RTLD_NOW); + if (!handle) { + std::cerr << "Failed to load shared library: " << dlerror() << std::endl; + return; + } + + PluginClientCreator client_creator = + reinterpret_cast(dlsym(handle, "CreateClient")); + if (!client_creator) { + std::cerr << "Failed to get CreateClient function: " << dlerror() + << std::endl; + dlclose(handle); + return; + } + client_creators_[plugin_name_] = client_creator; + + PluginRunnerCreator runner_creator = + reinterpret_cast(dlsym(handle, "CreateRunner")); + if (!runner_creator) { + std::cerr << "Failed to get CreateRunner function: " << dlerror() + << std::endl; + dlclose(handle); + return; + } + runner_creators_[plugin_name_] = runner_creator; + + dlclose(handle); + return; +} + +std::shared_ptr +PluginManager::CreateObjectStoreClientInstance(const std::string &plugin_name) { + + if (plugin_name == "default") { + return std::any_cast(client_creators_[plugin_name])(); + } + return std::any_cast(client_creators_[plugin_name])(); +} + +std::unique_ptr +PluginManager::CreateObjectStoreRunnerInstance(const std::string &plugin_name) { + if (plugin_name == "default") { + return std::any_cast(runner_creators_[plugin_name])( + default_runner_params_.store_socket_name, + default_runner_params_.object_store_memory, + default_runner_params_.huge_pages, + default_runner_params_.plasma_directory, + default_runner_params_.fallback_directory); + } + return std::any_cast(runner_creators_[plugin_name])(); +} + +void PluginManager::LoadObjectStoreClientPlugin() { + void *handle = dlopen(plugin_path_.c_str(), RTLD_NOW); + if (!handle) { + std::cerr << "Failed to load shared library: " << dlerror() << std::endl; + return; + } + + PluginClientCreator client_creator = + reinterpret_cast(dlsym(handle, "CreateClient")); + if (!client_creator) { + std::cerr << "Failed to get CreateClient function: " << dlerror() + << std::endl; + dlclose(handle); + return; + } + + client_creators_[plugin_name_] = client_creator; + dlclose(handle); + return; +} + +void PluginManager::SetObjectStoreClients(const std::string plugin_name, + const std::string plugin_path, + const std::string plugin_params) { + plugin_name_ = plugin_name; + plugin_path_ = plugin_path; + plugin_params_ = plugin_params; + + if (plugin_name != "default") { + LoadObjectStoreClientPlugin(); + } else { + default_client_creator_ = &CreateDefaultClientInstance; + client_creators_[plugin_name] = default_client_creator_; + } +} + +void PluginManager::SetObjectStores(const std::string plugin_name, + const std::string plugin_path, + const std::string plugin_params) { + plugin_name_ = plugin_name; + plugin_path_ = plugin_path; + plugin_params_ = plugin_params; + + // LoadPlugin + if (plugin_name != "default") { + LoadObjectStorePlugin(); + } else { + + // Parse the plugin parameters + json params_map = json::parse(plugin_params); + std::string store_socket_name = params_map.value("store_socket_name", ""); + int64_t object_store_memory = params_map["object_store_memory"]; + bool huge_pages = params_map.value("huge_pages", false); + std::string plasma_directory = params_map.value("plasma_directory", ""); + std::string fallback_directory = params_map.value("fallback_directory", ""); + + + default_runner_params_.store_socket_name = store_socket_name; + default_runner_params_.object_store_memory = object_store_memory; + default_runner_params_.huge_pages = huge_pages; + default_runner_params_.plasma_directory = plasma_directory; + default_runner_params_.fallback_directory = fallback_directory; + + default_client_creator_ = &CreateDefaultClientInstance; + default_runner_creator_ = &CreateDefaultRunnerInstance; + + client_creators_["default"] = default_client_creator_; + runner_creators_["default"] = default_runner_creator_; + + } +} + +} // namespace ray + diff --git a/src/ray/object_manager/plugin_manager.h b/src/ray/object_manager/plugin_manager.h new file mode 100644 index 000000000000..a175025fec62 --- /dev/null +++ b/src/ray/object_manager/plugin_manager.h @@ -0,0 +1,130 @@ +#pragma once + +#include +#include +#include +#include +#include "ray/util/logging.h" + +#include "ray/object_manager/plasma/object_store_client_interface.h" +#include "ray/object_manager/plasma/object_store_runner_interface.h" +#include "ray/object_manager/plasma/client.h" +#include "ray/object_manager/plasma/store_runner.h" + + +namespace ray { + +class PluginManager { + public: + + using DefaultClientCreator = std::shared_ptr (*)(); + using DefaultRunnerCreator = std::unique_ptr (*)( + std::string, + int64_t, + bool, + std::string, + std::string); + + + /// Create a object store runner instance that is currently in use. + /// + /// \param plugin_name The name of the object store runner plugin. + std::unique_ptr CreateObjectStoreRunnerInstance( + const std::string& plugin_name); + + + /// Create a object store client instance that is currently in use. + /// + /// \param plugin_name The name of the object store client plugin. + std::shared_ptr CreateObjectStoreClientInstance( + const std::string& plugin_name); + + + /// Set up object store clients according to the name of the plugin, the path + /// of the plugin shared library, and also the provided plugin parameters. + /// If the plugin name is 'default', it would load the current PlasmaClient as the + /// client instance; if the plugin name is other than 'default', it would load the + /// client creators function from the shared library provided from the plugin path. + /// + /// \param plugin_name The name of the object store client plugin. + /// \param plugin_path The path to the plugin shared library. + /// \param plugin_params The parameters to the plugin. + void SetObjectStoreClients(const std::string plugin_name, + const std::string plugin_path, + const std::string plugin_params); + + + /// Set up both object store clients and runners according to the name of the + /// plugin, the path of the plugin shared library, and also the provided + /// plugin parameters. + /// + /// If the plugin name is 'default', it would load the current PlasmaClient and + /// PlasmaStoreRunner as the client/store runner instance; if the plugin name + /// is other than 'default', it would load the client/runner creators function + /// from the shared library provided by the plugin path. + /// + /// \param plugin_name The name of the object store client plugin. + /// \param plugin_path The path to the plugin shared library. + /// \param plugin_params The parameters to the plugin. + void SetObjectStores(const std::string plugin_name, + const std::string plugin_path, + const std::string plugin_params); + + /// Load the object Store client creators and store runner creators from the + /// shared library. + /// + void LoadObjectStorePlugin(); + + /// Load the object store client creators from the shared library. + /// + void LoadObjectStoreClientPlugin(); + + + /// Get an instance from the singleton PluginManager. + /// + static PluginManager &GetInstance(); + + private: + PluginManager(const PluginManager&) = delete; + PluginManager(PluginManager &&) = delete; + PluginManager& operator=(const PluginManager&) = delete; + PluginManager& operator=(PluginManager &&) = delete; + PluginManager(){} + ~PluginManager(){} + + /// Define parameter struct for creating default runner + struct DefaultRunnerParams { + std::string store_socket_name; + int64_t object_store_memory; + bool huge_pages; + std::string plasma_directory; + std::string fallback_directory; + }; + + /// Parameter struct for creating default object store runner + DefaultRunnerParams default_runner_params_; + + /// Object store client instance creators + std::map client_creators_; + + /// Object store runner instance creators + std::map runner_creators_; + + /// Creator for default object store client (original plasma client) + DefaultClientCreator default_client_creator_; + + /// Creator for default object store runner (original plasma store runner) + DefaultRunnerCreator default_runner_creator_; + + /// Plugin name + std::string plugin_name_; + + /// Plugin path + std::string plugin_path_; + + /// Plugin parameters + std::string plugin_params_; + +}; + +} // namespace ray diff --git a/src/ray/object_manager/test/object_buffer_pool_test.cc b/src/ray/object_manager/test/object_buffer_pool_test.cc index 1ae4602f06ac..55ab798c9969 100644 --- a/src/ray/object_manager/test/object_buffer_pool_test.cc +++ b/src/ray/object_manager/test/object_buffer_pool_test.cc @@ -22,7 +22,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" #include "ray/common/id.h" -#include "ray/object_manager/plasma/client.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION #include @@ -35,7 +35,7 @@ namespace ray { using ::testing::_; -class MockPlasmaClient : public plasma::PlasmaClientInterface { +class MockPlasmaClient : public plasma::ObjectStoreClientInterface { public: MOCK_METHOD1(Release, ray::Status(const ObjectID &object_id)); @@ -64,6 +64,41 @@ class MockPlasmaClient : public plasma::PlasmaClientInterface { } MOCK_METHOD1(Delete, ray::Status(const std::vector &object_ids)); + + MOCK_METHOD4(Connect, ray::Status(const std::string &store_socket_name, + const std::string &manager_socket_name, + int release_delay, + int num_retries)); + + MOCK_METHOD2(Authenticate, ray::Status(const std::string& user, + const std::string& passwd)); + + MOCK_METHOD1(Authenticate, ray::Status(const std::string& secret)); + + void MemCpy(void* dest, void* src, size_t len) { + memcpy(dest, src, len); + } + + ray::Status TryCreateImmediately(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + std::shared_ptr *data, + plasma::flatbuf::ObjectSource source, + int device_num) { + *data = std::make_shared(data_size); + return ray::Status::OK(); + } + + MOCK_METHOD2(Contains, ray::Status(const ObjectID &object_id, bool *has_object)); + + MOCK_METHOD2(Evict, ray::Status(int64_t num_bytes, int64_t &num_bytes_evicted)); + + MOCK_METHOD0(DebugString, std::string()); + + MOCK_METHOD0(store_capacity, int64_t()); + }; class ObjectBufferPoolTest : public ::testing::Test { @@ -169,3 +204,4 @@ int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } + diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index f99c719c8a28..0f305d6ef2ce 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -62,6 +62,9 @@ DEFINE_string(resource_dir, "", "The path of this ray resource directory."); DEFINE_int32(ray_debugger_external, 0, "Make Ray debugger externally accessible."); // store options DEFINE_int64(object_store_memory, -1, "The initial memory of the object store."); +DEFINE_string(plugin_name, "", "The name of the plugin object store. "); +DEFINE_string(plugin_path, "", "The full path of the library if the plugin is a shared library."); +DEFINE_string(plugin_params, "", "The objectstore startup parameters as a series of key-value pairs."); DEFINE_string(node_name, "", "The user-provided identifier or name for this node."); DEFINE_string(session_name, "", "Session name (ClusterID) of the cluster."); #ifdef __linux__ @@ -85,6 +88,7 @@ int main(int argc, char *argv[]) { ray::RayLog::InstallFailureSignalHandler(argv[0]); ray::RayLog::InstallTerminateHandler(); + gflags::ParseCommandLineFlags(&argc, &argv, true); const std::string raylet_socket_name = FLAGS_raylet_socket_name; const std::string store_socket_name = FLAGS_store_socket_name; @@ -113,6 +117,9 @@ int main(int argc, char *argv[]) { const std::string resource_dir = FLAGS_resource_dir; const int ray_debugger_external = FLAGS_ray_debugger_external; const int64_t object_store_memory = FLAGS_object_store_memory; + const std::string plugin_name = (FLAGS_plugin_name == "None") ? "default" : FLAGS_plugin_name; + const std::string plugin_path = FLAGS_plugin_path; + const std::string plugin_params = FLAGS_plugin_params; const std::string plasma_directory = FLAGS_plasma_directory; const bool huge_pages = FLAGS_huge_pages; const int metrics_export_port = FLAGS_metrics_export_port; @@ -238,6 +245,9 @@ int main(int argc, char *argv[]) { RAY_LOG(FATAL) << "Object store memory should be set."; } object_manager_config.object_store_memory = object_store_memory; + object_manager_config.plugin_name = plugin_name; + object_manager_config.plugin_path = plugin_path; + object_manager_config.plugin_params = plugin_params; object_manager_config.max_bytes_in_flight = RayConfig::instance().object_manager_max_bytes_in_flight(); object_manager_config.plasma_directory = plasma_directory; @@ -319,3 +329,4 @@ int main(int argc, char *argv[]) { main_service.run(); } #endif + diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ca0b4e015cd7..dad70cd811de 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -38,9 +38,12 @@ #include "ray/util/event_label.h" #include "ray/util/sample.h" #include "ray/util/util.h" +#include "ray/object_manager/plugin_manager.h" namespace { +ray::PluginManager& plugin_manager = ray::PluginManager::GetInstance(); + #define RAY_CHECK_ENUM(x, y) \ static_assert(static_cast(x) == static_cast(y), "protocol mismatch") @@ -229,6 +232,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, ref.set_object_id(object_id.Binary()); MarkObjectsAsFailed(error_type, {ref}, JobID::Nil()); }), + store_client_(plugin_manager.CreateObjectStoreClientInstance(object_manager_config.plugin_name)), periodical_runner_(io_service), report_resources_period_ms_(config.report_resources_period_ms), temp_dir_(config.temp_dir), @@ -291,7 +295,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, RayConfig::instance().min_memory_free_bytes(), RayConfig::instance().memory_monitor_refresh_ms(), CreateMemoryUsageRefreshCallback())) { - RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; cluster_resource_scheduler_ = std::make_shared( scheduling::NodeID(self_node_id_.Binary()), config.resource_config.ToResourceMap(), @@ -368,7 +371,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, [this]() { cluster_task_manager_->ScheduleAndDispatchTasks(); }, RayConfig::instance().worker_cap_initial_backoff_delay_ms()); - RAY_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); + RAY_CHECK_OK(store_client_->Connect(config.store_socket_name.c_str(),"")); // Run the node manger rpc server. node_manager_server_.RegisterService(node_manager_service_); node_manager_server_.RegisterService(agent_manager_service_); @@ -2101,7 +2104,7 @@ void NodeManager::MarkObjectsAsFailed( << error_type; std::shared_ptr data; Status status; - status = store_client_.TryCreateImmediately( + status = store_client_->TryCreateImmediately( object_id, ref.owner_address(), 0, @@ -2110,7 +2113,7 @@ void NodeManager::MarkObjectsAsFailed( &data, plasma::flatbuf::ObjectSource::ErrorStoredByRaylet); if (status.ok()) { - status = store_client_.Seal(object_id); + status = store_client_->Seal(object_id); } if (!status.ok() && !status.IsObjectExists()) { RAY_LOG(DEBUG) << "Marking plasma object failed " << object_id; @@ -2441,7 +2444,7 @@ bool NodeManager::GetObjectsFromPlasma(const std::vector &object_ids, // since we must wait for the plasma store's reply. We should consider using // an `AsyncGet` instead. if (!store_client_ - .Get(object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false) + ->Get(object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false) .ok()) { return false; } @@ -3084,3 +3087,4 @@ void NodeManager::ReportWorkerOOMKillStats() { } // namespace raylet } // namespace ray + diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 2982da44aff3..1c2515dd8e4f 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -47,6 +47,7 @@ #include "ray/common/bundle_spec.h" #include "ray/raylet/placement_group_resource_manager.h" #include "ray/raylet/worker_killing_policy.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" // clang-format on namespace ray { @@ -695,7 +696,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// A Plasma object store client. This is used for creating new objects in /// the object store (e.g., for actor tasks that can't be run because the /// actor died) and to pin objects that are in scope in the cluster. - plasma::PlasmaClient store_client_; + //plasma::PlasmaClient store_client_; + std::shared_ptr store_client_; /// The runner to run function periodically. PeriodicalRunner periodical_runner_; /// The period used for the resources report timer. @@ -843,3 +845,4 @@ class NodeManager : public rpc::NodeManagerServiceHandler, } // namespace raylet } // namespace ray + diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index c2cfa3d09c68..d5eba6168a03 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -162,3 +162,4 @@ void Raylet::HandleAccept(const boost::system::error_code &error) { } // namespace raylet } // namespace ray +