Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ cc_library(
"src/ray/object_manager/plasma/plasma_generated.h",
"src/ray/object_manager/plasma/protocol.h",
"src/ray/object_manager/plasma/shared_memory.h",
"src/ray/object_manager/plasma/object_store_client_interface.h",
] + select({
"@bazel_tools//src/conditions:windows": [
],
Expand Down Expand Up @@ -388,6 +389,7 @@ cc_library(
"src/ray/object_manager/plasma/stats_collector.h",
"src/ray/object_manager/plasma/store.h",
"src/ray/object_manager/plasma/store_runner.h",
"src/ray/object_manager/plasma/object_store_runner_interface.h",
"src/ray/thirdparty/dlmalloc.c",
],
copts = PLASMA_COPTS,
Expand Down Expand Up @@ -821,14 +823,19 @@ cc_library(
"src/ray/core_worker/store_provider/*.h",
"src/ray/core_worker/store_provider/memory_store/*.h",
"src/ray/core_worker/transport/*.h",
]),
]) + [
"src/ray/object_manager/plugin_manager.h",
] + glob([
"src/ray/object_manager/plasma/*.h",
])
,
copts = COPTS,
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
":gcs",
":gcs_client_lib",
":plasma_client",
":plugin_manager",
":ray_common",
":ray_util",
":raylet_client_lib",
Expand Down Expand Up @@ -2369,13 +2376,33 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "plugin_manager",
srcs = [
"src/ray/object_manager/plugin_manager.cc",
],
hdrs = [
"src/ray/object_manager/plugin_manager.h",
],
copts = PLASMA_COPTS,
linkopts = PLASMA_LINKOPTS,
strip_include_prefix = "src",
deps = [
":plasma_client",
":plasma_store_server_lib",
],
)
cc_library(
name = "object_manager",
srcs = glob([
"src/ray/object_manager/*.cc",
"src/ray/object_manager/notification/*.cc",
]),
srcs = glob(
[
"src/ray/object_manager/*.cc",
"src/ray/object_manager/notification/*.cc",
],
exclude = [
"src/ray/object_manager/plugin_manager.cc",
],
),
hdrs = glob([
"src/ray/object_manager/*.h",
"src/ray/object_manager/notification/*.h",
Expand All @@ -2387,6 +2414,7 @@ cc_library(
":gcs",
":object_manager_rpc",
":plasma_store_server_lib",
":plugin_manager",
":ray_common",
":ray_util",
"@boost//:asio",
Expand Down Expand Up @@ -3017,3 +3045,4 @@ genrule(
""",
local = 1,
)

1 change: 1 addition & 0 deletions cpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,4 @@ py_test(
},
tags = ["team:core"],
)

26 changes: 26 additions & 0 deletions cpp/src/ray/config_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ ABSL_FLAG(std::string,
"The namespace of job. If not set,"
" a unique value will be randomly generated.");

ABSL_FLAG(std::string,
ray_plugin_name,
"default",
"The name of the object store plugin.");

ABSL_FLAG(std::string,
ray_plugin_path,
"",
"The path to the object store plugin library.");

ABSL_FLAG(std::string,
ray_plugin_params,
"{}",
"The paramters input of the object store plugin.");

using json = nlohmann::json;

namespace ray {
Expand Down Expand Up @@ -170,7 +185,17 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
if (!FLAGS_ray_node_ip_address.CurrentValue().empty()) {
node_ip_address = FLAGS_ray_node_ip_address.CurrentValue();
}
if (!FLAGS_ray_plugin_name.CurrentValue().empty()){
plugin_name = FLAGS_ray_plugin_name.CurrentValue();
}
if (!FLAGS_ray_plugin_path.CurrentValue().empty()){
plugin_path = FLAGS_ray_plugin_path.CurrentValue();
}
if (!FLAGS_ray_plugin_params.CurrentValue().empty()){
plugin_params = FLAGS_ray_plugin_params.CurrentValue();
}
if (!FLAGS_ray_head_args.CurrentValue().empty()) {
RAY_LOG(INFO) << FLAGS_ray_head_args.CurrentValue();
std::vector<std::string> args =
absl::StrSplit(FLAGS_ray_head_args.CurrentValue(), ' ', absl::SkipEmpty());
head_args.insert(head_args.end(), args.begin(), args.end());
Expand Down Expand Up @@ -251,3 +276,4 @@ void ConfigInternal::UpdateSessionDir(const std::string dir) {
}
} // namespace internal
} // namespace ray

7 changes: 7 additions & 0 deletions cpp/src/ray/config_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ConfigInternal {

std::vector<std::string> head_args = {};

std::string plugin_name = "default";

std::string plugin_path = "";

std::string plugin_params = "{}";

boost::optional<RuntimeEnv> runtime_env;

int runtime_env_hash = 0;
Expand Down Expand Up @@ -94,3 +100,4 @@ class ConfigInternal {

} // namespace internal
} // namespace ray

6 changes: 5 additions & 1 deletion cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
node_ip = GetNodeIpAddress();
}
}

std::unique_ptr<ray::gcs::GlobalStateAccessor> global_state_accessor =
CreateGlobalStateAccessor(bootstrap_address);
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER) {
Expand Down Expand Up @@ -129,6 +129,9 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
options.job_id = global_state_accessor->GetNextJobID();
}
}
options.plugin_name = ConfigInternal::Instance().plugin_name;
options.plugin_path = ConfigInternal::Instance().plugin_path;
options.plugin_params = ConfigInternal::Instance().plugin_params;
options.gcs_options = gcs_options;
options.enable_logging = true;
options.log_dir = ConfigInternal::Instance().logs_dir;
Expand Down Expand Up @@ -174,3 +177,4 @@ void ProcessHelper::RayStop() {

} // namespace internal
} // namespace ray

45 changes: 43 additions & 2 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)


class Node:
"""An encapsulation of the Ray processes on a single node.

Expand Down Expand Up @@ -285,6 +284,38 @@ def __init__(
if head:
self.start_head_processes()

if ray_params.plugin_name:
self._plugin_name = ray_params.plugin_name
os.environ[ray_constants.RAY_PLUGIN_NAME_ENVIRONMENT_VARIABLE] = ray_params.plugin_name
else:
env_plugin_name = os.environ.get(ray_constants.RAY_PLUGIN_NAME_ENVIRONMENT_VARIABLE)
if env_plugin_name is not None and env_plugin_name != "":
self._plugin_name = env_plugin_name
else:
self._plugin_name = "default"


if ray_params.plugin_path:
self._plugin_path = ray_params.plugin_path
os.environ[ray_constants.RAY_PLUGIN_PATH_ENVIRONMENT_VARIABLE] = ray_params.plugin_path
else:
env_plugin_path = os.environ.get(ray_constants.RAY_PLUGIN_PATH_ENVIRONMENT_VARIABLE)
if env_plugin_path is not None and env_plugin_path != "":
self._plugin_path = env_plugin_path
else:
self._plugin_path = ""


if ray_params.plugin_params:
self._plugin_params_str = json.dumps(ray_params.plugin_params)
os.environ[ray_constants.RAY_PLUGIN_PARAMS_ENVIRONMENT_VARIABLE_STR] = ray_params.plugin_params
else:
env_plugin_params_str = os.environ.get(ray_constants.RAY_PLUGIN_PARAMS_ENVIRONMENT_VARIABLE_STR)
if env_plugin_params_str is not None and env_plugin_params_str != "":
self._plugin_params_str = env_plugin_params_str
else:
self._plugin_params_str = "{}"

if not connect_only:
self.start_ray_processes()
# we should update the address info after the node has been started
Expand Down Expand Up @@ -987,6 +1018,9 @@ def start_raylet(
self,
plasma_directory: str,
object_store_memory: int,
plugin_name: str,
plugin_path: str,
plugin_params: str,
use_valgrind: bool = False,
use_profiler: bool = False,
):
Expand Down Expand Up @@ -1016,6 +1050,9 @@ def start_raylet(
self.get_resource_spec(),
plasma_directory,
object_store_memory,
plugin_name,
plugin_path,
plugin_params,
self.session_name,
is_head_node=self.is_head(),
min_worker_port=self._ray_params.min_worker_port,
Expand Down Expand Up @@ -1211,7 +1248,10 @@ def start_ray_processes(self):
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
)
self.start_raylet(plasma_directory, object_store_memory)

self.start_raylet(plasma_directory, object_store_memory, self._plugin_name,
self._plugin_path,
self._plugin_params_str)
if self._ray_params.include_log_monitor:
self.start_log_monitor()

Expand Down Expand Up @@ -1580,3 +1620,4 @@ def _record_stats(self):
"redis" if os.environ.get("RAY_REDIS_ADDRESS") is not None else "memory"
)
record_extra_usage_tag(TagKey.GCS_STORAGE, gcs_storage_type)

7 changes: 7 additions & 0 deletions python/ray/_private/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class RayParams:

def __init__(
self,
plugin_name: Optional[str] = None,
plugin_path: Optional[str] = None,
plugin_params: Optional[Dict[str, any]] = None,
redis_address: Optional[str] = None,
gcs_address: Optional[str] = None,
num_cpus: Optional[int] = None,
Expand Down Expand Up @@ -177,6 +180,9 @@ def __init__(
session_name: Optional[str] = None,
webui: Optional[str] = None,
):
self.plugin_name = plugin_name
self.plugin_path = plugin_path
self.plugin_params = plugin_params
self.redis_address = redis_address
self.gcs_address = gcs_address
self.num_cpus = num_cpus
Expand Down Expand Up @@ -438,3 +444,4 @@ def _format_ports(self, pre_selected_ports):
port_range_str = f"from {min_port} to {max_port}"
ports[comp] = f"{len(port_list)} ports {port_range_str}"
return ports

6 changes: 5 additions & 1 deletion python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Ray constants used in the Python code."""
f"""Ray constants used in the Python code."""

import logging
import os
Expand Down Expand Up @@ -425,3 +425,7 @@ def gcs_actor_scheduling_enabled():
}

RAY_ENABLE_RECORD_TASK_LOGGING = env_bool("RAY_ENABLE_RECORD_TASK_LOGGING", False)

RAY_PLUGIN_NAME_ENVIRONMENT_VARIABLE = "RAY_PLUGIN_NAME"
RAY_PLUGIN_PATH_ENVIRONMENT_VARIABLE = "RAY_PLUGIN_PATH"
RAY_PLUGIN_PARAMS_ENVIRONMENT_VARIABLE_STR = "RAY_PLUGIN_PARAMS_STR"
9 changes: 7 additions & 2 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,9 @@ def start_raylet(
resource_spec,
plasma_directory: str,
object_store_memory: int,
plugin_name: str,
plugin_path: str,
plugin_params: str,
session_name: str,
is_head_node: bool,
min_worker_port: Optional[int] = None,
Expand Down Expand Up @@ -1602,12 +1605,14 @@ def start_raylet(
f"--metrics-agent-port={metrics_agent_port}",
f"--metrics_export_port={metrics_export_port}",
f"--object_store_memory={object_store_memory}",
f"--plugin_name={plugin_name}",
f"--plugin_path={plugin_path}",
f"--plugin_params={plugin_params}",
f"--plasma_directory={plasma_directory}",
f"--ray-debugger-external={1 if ray_debugger_external else 0}",
f"--gcs-address={gcs_address}",
f"--session-name={session_name}",
]

if is_head_node:
command.append("--head")

Expand Down Expand Up @@ -1637,7 +1642,6 @@ def start_raylet(
fate_share=fate_share,
env_updates=env_updates,
)

return process_info


Expand Down Expand Up @@ -2018,3 +2022,4 @@ def start_ray_client_server(
fate_share=fate_share,
)
return process_info

Loading