Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ cc_library(
}),
hdrs = [
"src/ray/object_manager/common.h",
"src/ray/object_manager/plasma/object_store_client_interface.h",
"src/ray/object_manager/plasma/client.h",
"src/ray/object_manager/plasma/common.h",
"src/ray/object_manager/plasma/compat.h",
Expand Down Expand Up @@ -387,6 +388,7 @@ cc_library(
"src/ray/object_manager/plasma/plasma_allocator.h",
"src/ray/object_manager/plasma/stats_collector.h",
"src/ray/object_manager/plasma/store.h",
"src/ray/object_manager/plasma/object_store_runner_interface.h",
"src/ray/object_manager/plasma/store_runner.h",
"src/ray/thirdparty/dlmalloc.c",
],
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/store_provider/plasma_store_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/reference_count.h"
#include "ray/object_manager/plasma/object_store_client_interface.h"
#include "ray/object_manager/plasma/client.h"
#include "ray/raylet_client/raylet_client.h"

Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace ray {

ObjectBufferPool::ObjectBufferPool(
std::shared_ptr<plasma::PlasmaClientInterface> store_client, uint64_t chunk_size)
std::shared_ptr<plasma::ObjectStoreClientInterface> store_client, uint64_t chunk_size)
: store_client_(store_client), default_chunk_size_(chunk_size) {}

ObjectBufferPool::~ObjectBufferPool() {
Expand Down
6 changes: 3 additions & 3 deletions src/ray/object_manager/object_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/object_manager/memory_object_reader.h"
#include "ray/object_manager/plasma/client.h"
#include "ray/object_manager/plasma/object_store_runner_interface.h"

namespace ray {

Expand Down Expand Up @@ -60,7 +60,7 @@ class ObjectBufferPool {
///
/// \param store_client Plasma store client. Used for testing purposes only.
/// \param chunk_size The chunk size into which objects are to be split.
ObjectBufferPool(std::shared_ptr<plasma::PlasmaClientInterface> store_client,
ObjectBufferPool(std::shared_ptr<plasma::ObjectStoreClientInterface> store_client,
const uint64_t chunk_size);

~ObjectBufferPool();
Expand Down Expand Up @@ -214,7 +214,7 @@ class ObjectBufferPool {
GUARDED_BY(pool_mutex_);

/// Plasma client pool.
std::shared_ptr<plasma::PlasmaClientInterface> store_client_;
std::shared_ptr<plasma::ObjectStoreClientInterface> store_client_;

/// Determines the maximum chunk size to be transferred by a single thread.
const uint64_t default_chunk_size_;
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config,
config.plasma_directory,
config.fallback_directory));
// Initialize object store.
std::map<std::string, std::string> emptyMap;
store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start,
plasma::plasma_store_runner.get(),
std::ref(emptyMap),
spill_objects_callback,
object_store_full_callback,
add_object_callback,
Expand Down Expand Up @@ -160,7 +162,7 @@ void ObjectManager::Stop() {
}

bool ObjectManager::IsPlasmaObjectSpillable(const ObjectID &object_id) {
return plasma::plasma_store_runner->IsPlasmaObjectSpillable(object_id);
return plasma::plasma_store_runner->IsObjectSpillable(object_id);
}

void ObjectManager::RunRpcService(int index) {
Expand Down
156 changes: 29 additions & 127 deletions src/ray/object_manager/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,125 +25,14 @@
#include "ray/common/buffer.h"
#include "ray/common/status.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/object_store_client_interface.h"
#include "ray/util/visibility.h"
#include "src/ray/protobuf/common.pb.h"

namespace plasma {

using ray::Buffer;
using ray::SharedMemoryBuffer;
using ray::Status;

/// Object buffer data structure.
struct ObjectBuffer {
/// The data buffer.
std::shared_ptr<SharedMemoryBuffer> data;
/// The metadata buffer.
std::shared_ptr<SharedMemoryBuffer> metadata;
/// The device number.
int device_num;
};

class PlasmaClientInterface {
public:
virtual ~PlasmaClientInterface(){};

/// Tell Plasma that the client no longer needs the object. This should be
/// called after Get() or Create() when the client is done with the object.
/// After this call, the buffer returned by Get() is no longer valid.
///
/// \param object_id The ID of the object that is no longer needed.
/// \return The return status.
virtual Status Release(const ObjectID &object_id) = 0;

/// Disconnect from the local plasma instance, including the local store and
/// manager.
///
/// \return The return status.
virtual Status Disconnect() = 0;

/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
/// timeout expires.
///
/// If an object was not retrieved, the corresponding metadata and data
/// fields in the ObjectBuffer structure will evaluate to false.
/// Objects are automatically released by the client when their buffers
/// get out of scope.
///
/// \param object_ids The IDs of the objects to get.
/// \param timeout_ms The amount of time in milliseconds to wait before this
/// request times out. If this value is -1, then no timeout is set.
/// \param[out] object_buffers The object results.
/// \param is_from_worker Whether or not if the Get request comes from a Ray workers.
/// \return The return status.
virtual Status Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers,
bool is_from_worker) = 0;

/// Seal an object in the object store. The object will be immutable after
/// this
/// call.
///
/// \param object_id The ID of the object to seal.
/// \return The return status.
virtual Status Seal(const ObjectID &object_id) = 0;

/// Abort an unsealed object in the object store. If the abort succeeds, then
/// it will be as if the object was never created at all. The unsealed object
/// must have only a single reference (the one that would have been removed by
/// calling Seal).
///
/// \param object_id The ID of the object to abort.
/// \return The return status.
virtual Status Abort(const ObjectID &object_id) = 0;

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
///
/// If this request cannot be fulfilled immediately, this call will block until
/// enough objects have been spilled to make space. If spilling cannot free
/// enough space, an out of memory error will be returned.
///
/// \param object_id The ID to use for the newly created object.
/// \param owner_address The address of the object's owner.
/// \param data_size The size in bytes of the space to be allocated for this
/// object's
/// data (this does not include space used for metadata).
/// \param metadata The object's metadata. If there is no metadata, this
/// pointer should be NULL.
/// \param metadata_size The size in bytes of the metadata. If there is no
/// metadata, this should be 0.
/// \param data The address of the newly created object will be written here.
/// \param device_num The number of the device where the object is being
/// created.
/// device_num = 0 corresponds to the host,
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
/// \return The return status.
///
/// The returned object must be released once it is done with. It must also
/// be either sealed or aborted.
virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id,
const ray::rpc::Address &owner_address,
int64_t data_size,
const uint8_t *metadata,
int64_t metadata_size,
std::shared_ptr<Buffer> *data,
plasma::flatbuf::ObjectSource source,
int device_num = 0) = 0;

/// Delete a list of objects from the object store. This currently assumes that the
/// object is present, has been sealed and not used by another client. Otherwise,
/// it is a no operation.
///
/// \param object_ids The list of IDs of the objects to delete.
/// \return The return status. If all the objects are non-existent, return OK.
virtual Status Delete(const std::vector<ObjectID> &object_ids) = 0;
};
namespace plasma {

class PlasmaClient : public PlasmaClientInterface {
class PlasmaClient : public ObjectStoreClientInterface{
public:
PlasmaClient();
~PlasmaClient();
Expand All @@ -163,7 +52,7 @@ class PlasmaClient : public PlasmaClientInterface {
Status Connect(const std::string &store_socket_name,
const std::string &manager_socket_name = "",
int release_delay = 0,
int num_retries = -1);
int num_retries = -1) override;

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
Expand Down Expand Up @@ -198,7 +87,7 @@ class PlasmaClient : public PlasmaClientInterface {
int64_t metadata_size,
std::shared_ptr<Buffer> *data,
plasma::flatbuf::ObjectSource source,
int device_num = 0);
int device_num = 0) override;

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
Expand Down Expand Up @@ -233,7 +122,7 @@ class PlasmaClient : public PlasmaClientInterface {
int64_t metadata_size,
std::shared_ptr<Buffer> *data,
plasma::flatbuf::ObjectSource source,
int device_num = 0);
int device_num = 0) override;

/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
Expand All @@ -253,15 +142,15 @@ class PlasmaClient : public PlasmaClientInterface {
Status Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers,
bool is_from_worker);
bool is_from_worker) override;

/// Tell Plasma that the client no longer needs the object. This should be
/// called after Get() or Create() when the client is done with the object.
/// After this call, the buffer returned by Get() is no longer valid.
///
/// \param object_id The ID of the object that is no longer needed.
/// \return The return status.
Status Release(const ObjectID &object_id);
Status Release(const ObjectID &object_id) override;

/// Check if the object store contains a particular object and the object has
/// been sealed. The result will be stored in has_object.
Expand All @@ -273,7 +162,7 @@ class PlasmaClient : public PlasmaClientInterface {
/// \param has_object The function will write true at this address if
/// the object is present and false if it is not present.
/// \return The return status.
Status Contains(const ObjectID &object_id, bool *has_object);
Status Contains(const ObjectID &object_id, bool *has_object) override;

/// Abort an unsealed object in the object store. If the abort succeeds, then
/// it will be as if the object was never created at all. The unsealed object
Expand All @@ -282,15 +171,15 @@ class PlasmaClient : public PlasmaClientInterface {
///
/// \param object_id The ID of the object to abort.
/// \return The return status.
Status Abort(const ObjectID &object_id);
Status Abort(const ObjectID &object_id) override;

/// Seal an object in the object store. The object will be immutable after
/// this
/// call.
///
/// \param object_id The ID of the object to seal.
/// \return The return status.
Status Seal(const ObjectID &object_id);
Status Seal(const ObjectID &object_id) override;

/// Delete an object from the object store. This currently assumes that the
/// object is present, has been sealed and not used by another client. Otherwise,
Expand All @@ -309,7 +198,7 @@ class PlasmaClient : public PlasmaClientInterface {
///
/// \param object_ids The list of IDs of the objects to delete.
/// \return The return status. If all the objects are non-existent, return OK.
Status Delete(const std::vector<ObjectID> &object_ids);
Status Delete(const std::vector<ObjectID> &object_ids) override;

/// Delete objects until we have freed up num_bytes bytes or there are no more
/// released objects that can be deleted.
Expand All @@ -318,23 +207,36 @@ class PlasmaClient : public PlasmaClientInterface {
/// \param num_bytes_evicted Out parameter for total number of bytes of space
/// retrieved.
/// \return The return status.
Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted);
Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted) override;

/// Disconnect from the local plasma instance, including the local store and
/// manager.
///
/// \return The return status.
Status Disconnect();
Status Disconnect() override;

/// Get the current debug string from the plasma store server.
///
/// \return The debug string.
std::string DebugString();
std::string DebugString() override;

/// Get the memory capacity of the store.
///
/// \return Memory capacity of the store in bytes.
int64_t store_capacity();
int64_t store_capacity() override;

Status Authenticate(const std::string& user,
const std::string& passwd) override {
return Status::OK();
};

Status Authenticate(const std::string& secret) override {
return Status::OK();
};

void MemCpy(void* dest, void* src, size_t len) override {
memcpy(dest, src, len);
};

private:
/// Retry a previous create call using the returned request ID.
Expand Down
Loading