From 3a0dd687c2415abc9177c06afea4665caa916e8b Mon Sep 17 00:00:00 2001 From: leomem Date: Thu, 13 Apr 2023 17:22:12 -0700 Subject: [PATCH] Add plugin base interfaces --- BUILD.bazel | 2 + .../store_provider/plasma_store_provider.h | 1 + src/ray/object_manager/object_buffer_pool.cc | 2 +- src/ray/object_manager/object_buffer_pool.h | 6 +- src/ray/object_manager/object_manager.cc | 4 +- src/ray/object_manager/plasma/client.h | 156 ++--------- .../plasma/object_store_client_interface.h | 249 ++++++++++++++++++ .../plasma/object_store_runner_interface.h | 63 +++++ src/ray/object_manager/plasma/store_runner.cc | 5 +- src/ray/object_manager/plasma/store_runner.h | 24 +- 10 files changed, 370 insertions(+), 142 deletions(-) create mode 100644 src/ray/object_manager/plasma/object_store_client_interface.h create mode 100644 src/ray/object_manager/plasma/object_store_runner_interface.h diff --git a/BUILD.bazel b/BUILD.bazel index c7b30fc61892..ee64c1fa7925 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -330,6 +330,7 @@ cc_library( }), hdrs = [ "src/ray/object_manager/common.h", + "src/ray/object_manager/plasma/object_store_client_interface.h", "src/ray/object_manager/plasma/client.h", "src/ray/object_manager/plasma/common.h", "src/ray/object_manager/plasma/compat.h", @@ -387,6 +388,7 @@ cc_library( "src/ray/object_manager/plasma/plasma_allocator.h", "src/ray/object_manager/plasma/stats_collector.h", "src/ray/object_manager/plasma/store.h", + "src/ray/object_manager/plasma/object_store_runner_interface.h", "src/ray/object_manager/plasma/store_runner.h", "src/ray/thirdparty/dlmalloc.c", ], diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 4bcfad197b1f..7e8541acaae9 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -22,6 +22,7 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/reference_count.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" #include "ray/object_manager/plasma/client.h" #include "ray/raylet_client/raylet_client.h" diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 8004fb588811..b939142ae559 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -21,7 +21,7 @@ namespace ray { ObjectBufferPool::ObjectBufferPool( - std::shared_ptr store_client, uint64_t chunk_size) + std::shared_ptr store_client, uint64_t chunk_size) : store_client_(store_client), default_chunk_size_(chunk_size) {} ObjectBufferPool::~ObjectBufferPool() { diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index e514b2d72516..a71b33670155 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -27,7 +27,7 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/object_manager/memory_object_reader.h" -#include "ray/object_manager/plasma/client.h" +#include "ray/object_manager/plasma/object_store_runner_interface.h" namespace ray { @@ -60,7 +60,7 @@ class ObjectBufferPool { /// /// \param store_client Plasma store client. Used for testing purposes only. /// \param chunk_size The chunk size into which objects are to be split. - ObjectBufferPool(std::shared_ptr store_client, + ObjectBufferPool(std::shared_ptr store_client, const uint64_t chunk_size); ~ObjectBufferPool(); @@ -214,7 +214,7 @@ class ObjectBufferPool { GUARDED_BY(pool_mutex_); /// Plasma client pool. - std::shared_ptr store_client_; + std::shared_ptr store_client_; /// Determines the maximum chunk size to be transferred by a single thread. const uint64_t default_chunk_size_; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 18d3e76eb297..9fa18df3aef8 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -36,8 +36,10 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config, config.plasma_directory, config.fallback_directory)); // Initialize object store. + std::map emptyMap; store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start, plasma::plasma_store_runner.get(), + std::ref(emptyMap), spill_objects_callback, object_store_full_callback, add_object_callback, @@ -160,7 +162,7 @@ void ObjectManager::Stop() { } bool ObjectManager::IsPlasmaObjectSpillable(const ObjectID &object_id) { - return plasma::plasma_store_runner->IsPlasmaObjectSpillable(object_id); + return plasma::plasma_store_runner->IsObjectSpillable(object_id); } void ObjectManager::RunRpcService(int index) { diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index d466528ecd27..00ca5ab59288 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -25,125 +25,14 @@ #include "ray/common/buffer.h" #include "ray/common/status.h" #include "ray/object_manager/plasma/common.h" +#include "ray/object_manager/plasma/object_store_client_interface.h" #include "ray/util/visibility.h" #include "src/ray/protobuf/common.pb.h" -namespace plasma { - -using ray::Buffer; -using ray::SharedMemoryBuffer; -using ray::Status; - -/// Object buffer data structure. -struct ObjectBuffer { - /// The data buffer. - std::shared_ptr data; - /// The metadata buffer. - std::shared_ptr metadata; - /// The device number. - int device_num; -}; - -class PlasmaClientInterface { - public: - virtual ~PlasmaClientInterface(){}; - - /// Tell Plasma that the client no longer needs the object. This should be - /// called after Get() or Create() when the client is done with the object. - /// After this call, the buffer returned by Get() is no longer valid. - /// - /// \param object_id The ID of the object that is no longer needed. - /// \return The return status. - virtual Status Release(const ObjectID &object_id) = 0; - - /// Disconnect from the local plasma instance, including the local store and - /// manager. - /// - /// \return The return status. - virtual Status Disconnect() = 0; - - /// Get some objects from the Plasma Store. This function will block until the - /// objects have all been created and sealed in the Plasma Store or the - /// timeout expires. - /// - /// If an object was not retrieved, the corresponding metadata and data - /// fields in the ObjectBuffer structure will evaluate to false. - /// Objects are automatically released by the client when their buffers - /// get out of scope. - /// - /// \param object_ids The IDs of the objects to get. - /// \param timeout_ms The amount of time in milliseconds to wait before this - /// request times out. If this value is -1, then no timeout is set. - /// \param[out] object_buffers The object results. - /// \param is_from_worker Whether or not if the Get request comes from a Ray workers. - /// \return The return status. - virtual Status Get(const std::vector &object_ids, - int64_t timeout_ms, - std::vector *object_buffers, - bool is_from_worker) = 0; - /// Seal an object in the object store. The object will be immutable after - /// this - /// call. - /// - /// \param object_id The ID of the object to seal. - /// \return The return status. - virtual Status Seal(const ObjectID &object_id) = 0; - - /// Abort an unsealed object in the object store. If the abort succeeds, then - /// it will be as if the object was never created at all. The unsealed object - /// must have only a single reference (the one that would have been removed by - /// calling Seal). - /// - /// \param object_id The ID of the object to abort. - /// \return The return status. - virtual Status Abort(const ObjectID &object_id) = 0; - - /// Create an object in the Plasma Store. Any metadata for this object must be - /// be passed in when the object is created. - /// - /// If this request cannot be fulfilled immediately, this call will block until - /// enough objects have been spilled to make space. If spilling cannot free - /// enough space, an out of memory error will be returned. - /// - /// \param object_id The ID to use for the newly created object. - /// \param owner_address The address of the object's owner. - /// \param data_size The size in bytes of the space to be allocated for this - /// object's - /// data (this does not include space used for metadata). - /// \param metadata The object's metadata. If there is no metadata, this - /// pointer should be NULL. - /// \param metadata_size The size in bytes of the metadata. If there is no - /// metadata, this should be 0. - /// \param data The address of the newly created object will be written here. - /// \param device_num The number of the device where the object is being - /// created. - /// device_num = 0 corresponds to the host, - /// device_num = 1 corresponds to GPU0, - /// device_num = 2 corresponds to GPU1, etc. - /// \return The return status. - /// - /// The returned object must be released once it is done with. It must also - /// be either sealed or aborted. - virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id, - const ray::rpc::Address &owner_address, - int64_t data_size, - const uint8_t *metadata, - int64_t metadata_size, - std::shared_ptr *data, - plasma::flatbuf::ObjectSource source, - int device_num = 0) = 0; - - /// Delete a list of objects from the object store. This currently assumes that the - /// object is present, has been sealed and not used by another client. Otherwise, - /// it is a no operation. - /// - /// \param object_ids The list of IDs of the objects to delete. - /// \return The return status. If all the objects are non-existent, return OK. - virtual Status Delete(const std::vector &object_ids) = 0; -}; +namespace plasma { -class PlasmaClient : public PlasmaClientInterface { +class PlasmaClient : public ObjectStoreClientInterface{ public: PlasmaClient(); ~PlasmaClient(); @@ -163,7 +52,7 @@ class PlasmaClient : public PlasmaClientInterface { Status Connect(const std::string &store_socket_name, const std::string &manager_socket_name = "", int release_delay = 0, - int num_retries = -1); + int num_retries = -1) override; /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. @@ -198,7 +87,7 @@ class PlasmaClient : public PlasmaClientInterface { int64_t metadata_size, std::shared_ptr *data, plasma::flatbuf::ObjectSource source, - int device_num = 0); + int device_num = 0) override; /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. @@ -233,7 +122,7 @@ class PlasmaClient : public PlasmaClientInterface { int64_t metadata_size, std::shared_ptr *data, plasma::flatbuf::ObjectSource source, - int device_num = 0); + int device_num = 0) override; /// Get some objects from the Plasma Store. This function will block until the /// objects have all been created and sealed in the Plasma Store or the @@ -253,7 +142,7 @@ class PlasmaClient : public PlasmaClientInterface { Status Get(const std::vector &object_ids, int64_t timeout_ms, std::vector *object_buffers, - bool is_from_worker); + bool is_from_worker) override; /// Tell Plasma that the client no longer needs the object. This should be /// called after Get() or Create() when the client is done with the object. @@ -261,7 +150,7 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_id The ID of the object that is no longer needed. /// \return The return status. - Status Release(const ObjectID &object_id); + Status Release(const ObjectID &object_id) override; /// Check if the object store contains a particular object and the object has /// been sealed. The result will be stored in has_object. @@ -273,7 +162,7 @@ class PlasmaClient : public PlasmaClientInterface { /// \param has_object The function will write true at this address if /// the object is present and false if it is not present. /// \return The return status. - Status Contains(const ObjectID &object_id, bool *has_object); + Status Contains(const ObjectID &object_id, bool *has_object) override; /// Abort an unsealed object in the object store. If the abort succeeds, then /// it will be as if the object was never created at all. The unsealed object @@ -282,7 +171,7 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_id The ID of the object to abort. /// \return The return status. - Status Abort(const ObjectID &object_id); + Status Abort(const ObjectID &object_id) override; /// Seal an object in the object store. The object will be immutable after /// this @@ -290,7 +179,7 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_id The ID of the object to seal. /// \return The return status. - Status Seal(const ObjectID &object_id); + Status Seal(const ObjectID &object_id) override; /// Delete an object from the object store. This currently assumes that the /// object is present, has been sealed and not used by another client. Otherwise, @@ -309,7 +198,7 @@ class PlasmaClient : public PlasmaClientInterface { /// /// \param object_ids The list of IDs of the objects to delete. /// \return The return status. If all the objects are non-existent, return OK. - Status Delete(const std::vector &object_ids); + Status Delete(const std::vector &object_ids) override; /// Delete objects until we have freed up num_bytes bytes or there are no more /// released objects that can be deleted. @@ -318,23 +207,36 @@ class PlasmaClient : public PlasmaClientInterface { /// \param num_bytes_evicted Out parameter for total number of bytes of space /// retrieved. /// \return The return status. - Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted); + Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted) override; /// Disconnect from the local plasma instance, including the local store and /// manager. /// /// \return The return status. - Status Disconnect(); + Status Disconnect() override; /// Get the current debug string from the plasma store server. /// /// \return The debug string. - std::string DebugString(); + std::string DebugString() override; /// Get the memory capacity of the store. /// /// \return Memory capacity of the store in bytes. - int64_t store_capacity(); + int64_t store_capacity() override; + + Status Authenticate(const std::string& user, + const std::string& passwd) override { + return Status::OK(); + }; + + Status Authenticate(const std::string& secret) override { + return Status::OK(); + }; + + void MemCpy(void* dest, void* src, size_t len) override { + memcpy(dest, src, len); + }; private: /// Retry a previous create call using the returned request ID. diff --git a/src/ray/object_manager/plasma/object_store_client_interface.h b/src/ray/object_manager/plasma/object_store_client_interface.h new file mode 100644 index 000000000000..d22383a9238c --- /dev/null +++ b/src/ray/object_manager/plasma/object_store_client_interface.h @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ray/common/buffer.h" +#include "ray/common/status.h" +#include "ray/object_manager/plasma/common.h" +#include "ray/util/visibility.h" +#include "src/ray/protobuf/common.pb.h" + +namespace plasma { + +using ray::Buffer; +using ray::SharedMemoryBuffer; +using ray::Status; + +/// Object buffer data structure. +struct ObjectBuffer { + /// The data buffer. + std::shared_ptr data; + /// The metadata buffer. + std::shared_ptr metadata; + /// The device number. + int device_num; +}; + +class ObjectStoreClientInterface : public std::enable_shared_from_this { + public: + virtual ~ObjectStoreClientInterface(){}; + + /// Tell Store that the client no longer needs the object. This should be + /// called after Get() or Create() when the client is done with the object. + /// After this call, the buffer returned by Get() is no longer valid. + /// + /// \param object_id The ID of the object that is no longer needed. + /// \return The return status. + virtual Status Release(const ObjectID &object_id) = 0; + + /// Disconnect from the Store. + /// + /// \return The return status. + virtual Status Disconnect() = 0; + + /// Get some objects from the Store. This function will block until the + /// objects have all been created and sealed in the Plasma Store or the + /// timeout expires. + /// + /// If an object was not retrieved, the corresponding metadata and data + /// fields in the ObjectBuffer structure will evaluate to false. + /// Objects are automatically released by the client when their buffers + /// get out of scope. + /// + /// \param object_ids The IDs of the objects to get. + /// \param timeout_ms The amount of time in milliseconds to wait before this + /// request times out. If this value is -1, then no timeout is set. + /// \param[out] object_buffers The object results. + /// \param is_from_worker Whether or not if the Get request comes from a Ray workers. + /// \return The return status. + virtual Status Get(const std::vector &object_ids, + int64_t timeout_ms, + std::vector *object_buffers, + bool is_from_worker) = 0; + + /// Seal an object in the object store. The object will be immutable after + /// this + /// call. + /// + /// \param object_id The ID of the object to seal. + /// \return The return status. + virtual Status Seal(const ObjectID &object_id) = 0; + + /// Abort an unsealed object in the object store. If the abort succeeds, then + /// it will be as if the object was never created at all. The unsealed object + /// must have only a single reference (the one that would have been removed by + /// calling Seal). + /// + /// \param object_id The ID of the object to abort. + /// \return The return status. + virtual Status Abort(const ObjectID &object_id) = 0; + + /// Create an object in the Plasma Store. Any metadata for this object must be + /// be passed in when the object is created. + /// + /// If this request cannot be fulfilled immediately, this call will block until + /// enough objects have been spilled to make space. If spilling cannot free + /// enough space, an out of memory error will be returned. + /// + /// \param object_id The ID to use for the newly created object. + /// \param owner_address The address of the object's owner. + /// \param data_size The size in bytes of the space to be allocated for this + /// object's + /// data (this does not include space used for metadata). + /// \param metadata The object's metadata. If there is no metadata, this + /// pointer should be NULL. + /// \param metadata_size The size in bytes of the metadata. If there is no + /// metadata, this should be 0. + /// \param data The address of the newly created object will be written here. + /// \param device_num The number of the device where the object is being + /// created. + /// device_num = 0 corresponds to the host, + /// device_num = 1 corresponds to GPU0, + /// device_num = 2 corresponds to GPU1, etc. + /// \return The return status. + /// + /// The returned object must be released once it is done with. It must also + /// be either sealed or aborted. + virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + std::shared_ptr *data, + plasma::flatbuf::ObjectSource source, + int device_num = 0) = 0; + + /// Delete a list of objects from the object store. This currently assumes that the + /// object is present, has been sealed and not used by another client. Otherwise, + /// it is a no operation. + /// + /// \param object_ids The list of IDs of the objects to delete. + /// \return The return status. If all the objects are non-existent, return OK. + virtual Status Delete(const std::vector &object_ids) = 0; + + /// Connect to the store. Return the resulting connection. + /// + /// \param store_socket_name The name of the UNIX domain socket to use to + /// connect to the store. + /// \param manager_socket_name The name of the UNIX domain socket to use to + /// connect to the manager. If this is "", then this + /// function will not connect to a manager. + /// Note that manager is no longer supported, this function + /// will return failure if this is not "". + /// \param release_delay Deprecated (not used). + /// \param num_retries number of attempts to connect to IPC socket, default 50 + /// \return The return status. + virtual Status Connect(const std::string &store_socket_name, + const std::string &manager_socket_name, + int release_delay = 0, + int num_retries = -1) = 0; + + /// Authentication to the object store. + /// \param user user name. + /// \param passwd password + /// \return The return status. + virtual Status Authenticate(const std::string& user, + const std::string& passwd) = 0; + + /// Authentication to the object store. + /// \param secret The user secret. + /// \return The return status. + virtual Status Authenticate(const std::string& secret) = 0; + + /// The object storage optimized memory copy. + /// \param dest The destination address. + /// \param src The source address. + /// \param len Number of bytes to copy. + virtual void MemCpy(void* dest, + void* src, + size_t len) = 0; + + /// Create an object in the Plasma Store. Any metadata for this object must be + /// be passed in when the object is created. + /// + /// The plasma store will attempt to fulfill this request immediately. If it + /// cannot be fulfilled immediately, an error will be returned to the client. + /// + /// \param object_id The ID to use for the newly created object. + /// \param owner_address The address of the object's owner. + /// \param data_size The size in bytes of the space to be allocated for this + /// object's + /// data (this does not include space used for metadata). + /// \param metadata The object's metadata. If there is no metadata, this + /// pointer + /// should be NULL. + /// \param metadata_size The size in bytes of the metadata. If there is no + /// metadata, this should be 0. + /// \param data The address of the newly created object will be written here. + /// \param device_num The number of the device where the object is being + /// created. + /// device_num = 0 corresponds to the host, + /// device_num = 1 corresponds to GPU0, + /// device_num = 2 corresponds to GPU1, etc. + /// \return The return status. + /// + /// The returned object must be released once it is done with. It must also + /// be either sealed or aborted. + virtual Status TryCreateImmediately(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + std::shared_ptr *data, + plasma::flatbuf::ObjectSource source, + int device_num = 0) = 0; + + /// Check if the object store contains a particular object and the object has + /// been sealed. The result will be stored in has_object. + /// + /// @todo: We may want to indicate if the object has been created but not + /// sealed. + /// + /// \param object_id The ID of the object whose presence we are checking. + /// \param has_object The function will write true at this address if + /// the object is present and false if it is not present. + /// \return The return status. + virtual Status Contains(const ObjectID &object_id, bool *has_object) = 0; + + /// Delete objects until we have freed up num_bytes bytes or there are no more + /// released objects that can be deleted. + /// + /// \param num_bytes The number of bytes to try to free up. + /// \param num_bytes_evicted Out parameter for total number of bytes of space + /// retrieved. + /// \return The return status. + virtual Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted) = 0; + + /// Get the current debug string from the plasma store server. + /// + /// \return The debug string. + virtual std::string DebugString() = 0; + + /// Get the memory capacity of the store. + /// + /// \return Memory capacity of the store in bytes. + virtual int64_t store_capacity() = 0; + +}; + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/object_store_runner_interface.h b/src/ray/object_manager/plasma/object_store_runner_interface.h new file mode 100644 index 000000000000..40a56a8ce6cc --- /dev/null +++ b/src/ray/object_manager/plasma/object_store_runner_interface.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ray/common/buffer.h" +#include "ray/common/status.h" +#include "ray/object_manager/plasma/common.h" +#include "ray/util/visibility.h" +#include "src/ray/protobuf/common.pb.h" + +namespace plasma { + +class ObjectStoreRunnerInterface : public std::enable_shared_from_this { + public: + virtual ~ObjectStoreRunnerInterface(){}; + + /// Start the object store runner. The params are a series of key-value. + virtual void Start(const std::map& params, + ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback, + ray::AddObjectCallback add_object_callback, + ray::DeleteObjectCallback delete_object_callback) = 0; + + // Stop the object store + virtual void Stop() = 0; + + virtual bool IsObjectSpillable(const ObjectID &object_id) = 0; + + virtual int64_t GetConsumedBytes() = 0; + + virtual int64_t GetFallbackAllocated() const = 0; + + virtual void GetAvailableMemoryAsync(std::function callback) const = 0; + + // The Current total allocated available memory size + virtual int64_t GetTotalMemorySize() const = 0; + + // Maximal available memory size. It can be different from the Total memory + // size if the memory is dynamically expandable. + virtual int64_t GetMaxMemorySize() const = 0; +}; + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index ca1f8a71e455..9b0dfd4dd9a1 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -80,7 +80,8 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, fallback_directory_ = fallback_directory; } -void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback, +void PlasmaStoreRunner::Start(const std::map& params, + ray::SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback, ray::AddObjectCallback add_object_callback, ray::DeleteObjectCallback delete_object_callback) { @@ -137,7 +138,7 @@ void PlasmaStoreRunner::Shutdown() { } } -bool PlasmaStoreRunner::IsPlasmaObjectSpillable(const ObjectID &object_id) { +bool PlasmaStoreRunner::IsObjectSpillable(const ObjectID &object_id) { return store_->IsObjectSpillable(object_id); } diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 22e31b014684..2a8d2452ed79 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -8,28 +8,36 @@ #include "ray/common/file_system_monitor.h" #include "ray/object_manager/plasma/plasma_allocator.h" #include "ray/object_manager/plasma/store.h" +#include "ray/object_manager/plasma/object_store_runner_interface.h" namespace plasma { -class PlasmaStoreRunner { +class PlasmaStoreRunner : public ObjectStoreRunnerInterface { public: PlasmaStoreRunner(std::string socket_name, int64_t system_memory, bool hugepages_enabled, std::string plasma_directory, std::string fallback_directory); - void Start(ray::SpillObjectsCallback spill_objects_callback, + + void Start(const std::map& params, + ray::SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback, ray::AddObjectCallback add_object_callback, - ray::DeleteObjectCallback delete_object_callback); - void Stop(); + ray::DeleteObjectCallback delete_object_callback) override; - bool IsPlasmaObjectSpillable(const ObjectID &object_id); + void Stop() override; - int64_t GetConsumedBytes(); - int64_t GetFallbackAllocated() const; + bool IsObjectSpillable(const ObjectID &object_id) override; - void GetAvailableMemoryAsync(std::function callback) const { + int64_t GetConsumedBytes() override; + int64_t GetFallbackAllocated() const override; + + // The total memor size may be the same as the max memory size + int64_t GetTotalMemorySize() const override { return system_memory_; }; + int64_t GetMaxMemorySize() const override { return system_memory_; }; + + void GetAvailableMemoryAsync(std::function callback) const override{ main_service_.post([this, callback]() { store_->GetAvailableMemory(callback); }, "PlasmaStoreRunner.GetAvailableMemory"); }