diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d26d2e42..d26d1614 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,9 +13,9 @@ jobs: - run: echo "The job is automatically triggered by a ${{github.event_name}} event." - name: "Install APT packages" run: > - sudo apt update; + sudo apt update; sudo apt install libssl-dev librdmacm-dev libibverbs-dev libspdlog-dev -y; - sudo apt install libboost-all-dev ragel python3.10 python3-pip -y + sudo apt install libreadline-dev libboost-all-dev ragel python3 python3-pip -y - run: g++ --version - run: cmake --version - run: lscpu diff --git a/.github/workflows/build_use_zerocopy_delta_api.yml b/.github/workflows/build_use_zerocopy_delta_api.yml index e37c467d..96c213c5 100644 --- a/.github/workflows/build_use_zerocopy_delta_api.yml +++ b/.github/workflows/build_use_zerocopy_delta_api.yml @@ -13,9 +13,9 @@ jobs: - run: echo "The job is automatically triggered by a ${{github.event_name}} event." - name: "Install APT packages" run: > - sudo apt update; + sudo apt update; sudo apt install libssl-dev librdmacm-dev libibverbs-dev libspdlog-dev -y; - sudo apt install libboost-all-dev ragel python3.10 python3-pip -y + sudo apt install libreadline-dev libboost-all-dev ragel python3 python3-pip -y - run: g++ --version - run: cmake --version - run: lscpu diff --git a/include/cascade/cascade_interface.hpp b/include/cascade/cascade_interface.hpp index 4b1df133..614ffe81 100644 --- a/include/cascade/cascade_interface.hpp +++ b/include/cascade/cascade_interface.hpp @@ -116,8 +116,22 @@ class ICascadeStore { */ virtual void put_and_forget(const VT& value, bool as_trigger) const = 0; -#ifdef ENABLE_EVALUATION + /** + * @brief oob_send + * + * @param[in] data_addr Local memory address of data to write to remote node + * @param[in] gpu_addr Remote address to write to + * @param[in] rkey Access key to the remote memory + * @param[in] size The size of the remote allocated memory + */ + virtual bool oob_send(uint64_t data_addr,uint64_t gpu_addr, uint64_t rkey,size_t size) const { + dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); + return false; + } + +#ifdef ENABLE_EVALUATION + /** * @brief A function to evaluate the performance of an internal shard * * @param[in] max_payload_size The maximum size of the payload. diff --git a/include/cascade/detail/service_impl.hpp b/include/cascade/detail/service_impl.hpp index 82624c62..831bdf7e 100644 --- a/include/cascade/detail/service_impl.hpp +++ b/include/cascade/detail/service_impl.hpp @@ -664,6 +664,31 @@ derecho::rpc::QueryResults ServiceClient::put( return this->template type_recursive_put(subgroup_type_index,value,subgroup_index,shard_index,as_trigger); } +template +template +void ServiceClient::oob_get_remote(const node_id_t& node_id, uint32_t subgroup_index, uint64_t data_addr, uint64_t landing_addr, uint64_t rkey, size_t size){ + + if (!is_external_client()) { + auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); + subgroup_handle.template p2p_send(node_id,data_addr, landing_addr, rkey, size); + std::cout << "SENT P2P OOB SEND RPC CALL to node of id:" << node_id << std::endl; + + } +} + +template +void ServiceClient::oob_register_mem_ex(void* addr, size_t size, const memory_attribute_t& attr) { + group_ptr->register_oob_memory_ex(addr, size, attr); +} + +template +void ServiceClient::oob_deregister_mem(void* addr) { + group_ptr->deregister_oob_memory(addr); +} +template +uint64_t ServiceClient::oob_rkey(void* addr){ + return group_ptr->get_oob_memory_key(addr); +} template template void ServiceClient::put_and_forget( diff --git a/include/cascade/detail/volatile_store_impl.hpp b/include/cascade/detail/volatile_store_impl.hpp index 69e90c8a..1a2907b1 100644 --- a/include/cascade/detail/volatile_store_impl.hpp +++ b/include/cascade/detail/volatile_store_impl.hpp @@ -85,6 +85,23 @@ double internal_perf_put(derecho::Replicated& subgroup_handle, cons return (num_messages_sent)*1e9 / (now_ns - start_ns); } +template +bool VolatileCascadeStore::oob_send(uint64_t data_addr, uint64_t gpu_addr, uint64_t rkey, size_t size) const{ + // STEP 2 - do RDMA write to send the OOB data + dbg_default_debug("called oob_send with, data_addr={}, gpu_addr={}, rkey={}, size={}", data_addr, gpu_addr, rkey, size); + auto& subgroup_handle = group->template get_subgroup(this->subgroup_index); + struct iovec iov; + iov.iov_base = reinterpret_cast(data_addr); iov.iov_len = static_cast(size); + subgroup_handle.oob_remote_write(group->get_rpc_caller_id(),&iov,1,gpu_addr,rkey,size); + dbg_default_debug("Finished ASYNC oob remote write Derecho call"); + subgroup_handle.wait_for_oob_op(group->get_rpc_caller_id(),OOB_OP_WRITE,1000); + dbg_default_debug("FINISHED OOB REMOTE WRITE"); + + std::cout << "FINISHED OOB Remote Write" << std::endl; + return true; + +} + template double VolatileCascadeStore::perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const { debug_enter_func_with_args("max_payload_size={},duration_sec={}", max_payload_size, duration_sec); diff --git a/include/cascade/service.hpp b/include/cascade/service.hpp index 63190fd6..a9652dec 100644 --- a/include/cascade/service.hpp +++ b/include/cascade/service.hpp @@ -768,8 +768,23 @@ namespace cascade { */ template derecho::rpc::QueryResults put(const ObjectType& object, bool as_trigger = false); - - /** + + /** + * @param[in] node_id Node_id of the node that we want to execute a GPU-direct RDMA write an object + * @param[in] data_addr The address of the data on the node specified by node_id + * @param[in] gpu_addr The address of the allocated memory region (Starting address where the data will be written to during the one-sided RDMA write) + * @param[in] rkey The access key for allocated memory + * @param[in] size The size of the allocated memory region + */ + template + void oob_get_remote(const node_id_t& node_id, uint32_t subgroup_index, const uint64_t data_addr, uint64_t landing_addr, uint64_t rkey, size_t size); + + void oob_register_mem_ex(void* addr, size_t size, const memory_attribute_t& attr); + + void oob_deregister_mem(void* addr); + + uint64_t oob_rkey(void* addr); + /** * "put_and_forget" writes an object to a given subgroup/shard, but no return value. * * @param[in] object the object to write. diff --git a/include/cascade/volatile_store.hpp b/include/cascade/volatile_store.hpp index 6f2d166f..cf8fe909 100644 --- a/include/cascade/volatile_store.hpp +++ b/include/cascade/volatile_store.hpp @@ -61,7 +61,8 @@ class VolatileCascadeStore : public ICascadeStore, multi_get_size, get_size, get_size_by_time, - trigger_put + trigger_put, + oob_send #ifdef ENABLE_EVALUATION , dump_timestamp_log @@ -91,6 +92,7 @@ class VolatileCascadeStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual bool oob_send(uint64_t data_addr, uint64_t gpu_addr, uint64_t rkey, size_t size) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; #endif // ENABLE_EVALUATION diff --git a/src/service/perftest.cpp b/src/service/perftest.cpp index e7222399..fb0ce87e 100644 --- a/src/service/perftest.cpp +++ b/src/service/perftest.cpp @@ -1150,6 +1150,21 @@ PerfTestServer::~PerfTestServer() { // PerfTestClient implementation // ////////////////////////////////////// +std::ostream& operator<<(std::ostream& os, PutType pt) { + switch(pt) { + case PutType::PUT: + os << "PUT"; + break; + case PutType::PUT_AND_FORGET: + os << "PUT_AND_FORGET"; + break; + case PutType::TRIGGER_PUT: + os << "TRIGGER_PUT"; + break; + } + return os; +} + PerfTestClient::PerfTestClient(ServiceClientAPI& capi):capi(capi) {} void PerfTestClient::add_or_update_server(const std::string& host, uint16_t port) { diff --git a/src/service/perftest.hpp b/src/service/perftest.hpp index e1abc076..db48f6d5 100644 --- a/src/service/perftest.hpp +++ b/src/service/perftest.hpp @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include #include @@ -176,6 +177,8 @@ enum PutType { TRIGGER_PUT // trigger put }; +std::ostream& operator<<(std::ostream& os, PutType pt); + class PerfTestClient { private: std::map,std::unique_ptr<::rpc::client>> connections; @@ -778,3 +781,7 @@ bool PerfTestClient::perf_get_by_time(uint32_t subgroup_index, } } } + +// Formatter boilerplate for the spdlog library +template <> +struct fmt::formatter : fmt::ostream_formatter {}; \ No newline at end of file diff --git a/src/service/service.cpp b/src/service/service.cpp index 9d122354..e343002f 100644 --- a/src/service/service.cpp +++ b/src/service/service.cpp @@ -4,8 +4,41 @@ namespace derecho { namespace cascade { -/** - * cpu/gpu list examples: + +std::ostream& operator<<(std::ostream& stream, const ShardMemberSelectionPolicy& policy) { + switch(policy) { + case ShardMemberSelectionPolicy::FirstMember: + stream << "FirstMember"; + break; + case ShardMemberSelectionPolicy::LastMember: + stream << "LastMember"; + break; + case ShardMemberSelectionPolicy::Random: + stream << "Random"; + break; + case ShardMemberSelectionPolicy::FixedRandom: + stream << "FixedRandom"; + break; + case ShardMemberSelectionPolicy::RoundRobin: + stream << "RoundRobin"; + break; + case ShardMemberSelectionPolicy::KeyHashing: + stream << "KeyHashing"; + break; + case ShardMemberSelectionPolicy::UserSpecified: + stream << "UserSpecified"; + break; + case ShardMemberSelectionPolicy::InvalidPolicy: + default: + stream << "InvalidPolicy"; + break; + } + return stream; +} + + +/** + * cpu/gpu list examples: * cpu_cores = 0,1,2,3 * cpu_cores = 0,1-5,6,8 * cpu_cores = 0-15 diff --git a/src/udl_zoo/python/python_udl.cpp b/src/udl_zoo/python/python_udl.cpp index ebe33719..c7840a32 100644 --- a/src/udl_zoo/python/python_udl.cpp +++ b/src/udl_zoo/python/python_udl.cpp @@ -79,7 +79,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { } } -private: +private: /* request type to python thread */ struct python_request_t { enum { @@ -274,7 +274,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { res.sequence_num = req.sequence_num; dbg_default_trace("{}:{} [PYTHON] Processing request (type:{} sequence:{})", - __FILE__,__LINE__,req.type,req.sequence_num); + __FILE__,__LINE__,fmt::underlying(req.type),req.sequence_num); switch(req.type) { case python_request_t::TERMINATE: @@ -343,7 +343,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { dbg_default_trace("{}:{} calling the handler.", __FILE__,__LINE__); PyObject* ret = PyObject_Call(req.request.execute_ocdpo.handler_ptr,targs,kwargs); if (ret == nullptr) { - dbg_default_error("Exception raised in user application. {}:{}", + dbg_default_error("Exception raised in user application. {}:{}", __FILE__,__LINE__); PyErr_Print(); res.success = false; @@ -351,7 +351,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { } else { Py_DECREF(ret); } - + Py_DECREF(kwargs); #ifdef ENABLE_EVALUATION Py_DECREF(py_message_id); @@ -366,7 +366,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { Py_DECREF(py_pathname); Py_DECREF(py_sender); Py_DECREF(targs); - + res.success = true; dbg_default_trace("{}:{} User processing function returned.", __FILE__,__LINE__); } @@ -408,9 +408,9 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { if (conf_ptr->contains(PYUDL_CONF_ENTRY_CLASS)) { std::string class_name = (*conf_ptr)[PYUDL_CONF_ENTRY_CLASS].get(); dbg_default_trace("{}:{} create python handler object from class:{}",__FILE__,__LINE__,class_name); - + // we assure py_module will be valid because STEP 2 succeeded. - auto py_module = PythonOCDPO::get_module(module_name.c_str()); + auto py_module = PythonOCDPO::get_module(module_name.c_str()); auto entry_class_type = PyObject_GetAttrString(py_module,class_name.c_str()); if (entry_class_type == nullptr || !PyType_Check(entry_class_type)) { dbg_default_error("Failed loading python udl entry class:{}.{}. {}:{}", @@ -419,7 +419,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { res.success = false; break; } - + // test if entry_class_type is a subclass of Type UserDefinedLogical. if (!PythonOCDPO::is_valid_observer_type(reinterpret_cast(entry_class_type))) { dbg_default_error("Error: {} is not a subclass of derecho.cascade.udl.UserDefinedLogic. {}:{}", @@ -427,7 +427,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { res.success = false; break; } - + // create object std::string conf_str = to_string(*conf_ptr); auto conf_arg = Py_BuildValue("s",conf_str.c_str()); @@ -441,7 +441,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { auto pargs = PyTuple_New(1); PyTuple_SetItem(pargs,0,conf_arg); python_ocdpo = PyObject_Call(entry_class_type,pargs,nullptr); - + Py_DECREF(pargs); // Py_DECREF(conf_arg); <-- don't do this: conf_arg has been 'stolen' by PyTuple_SetItem() if (python_ocdpo == nullptr) { @@ -473,15 +473,15 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { break; } dbg_default_trace("{}:{} ocdpo handler method is created @{:p}", __FILE__,__LINE__,static_cast(python_ocdpo_handler)); - + res.ocdpo = std::make_shared(python_ocdpo,python_ocdpo_handler,dynamic_cast(ctxt)); res.success = true; } break; } - + dbg_default_trace("{}:{} [PYTHON] Finished processing request (type:{} sequence:{}), response.success={}", - __FILE__,__LINE__,req.type,req.sequence_num,res.success); + __FILE__,__LINE__,fmt::underlying(req.type),req.sequence_num,res.success); // notification std::unique_lock res_lock(python_response_mutex); @@ -525,7 +525,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { std::unique_lock req_lock(python_request_mutex); request.sequence_num = python_request_sequence_number++; dbg_default_trace("{}:{} posting request (type:{} seq:{})", - __FILE__,__LINE__,request.type,request.sequence_num); + __FILE__,__LINE__,fmt::underlying(request.type),request.sequence_num); python_request_queue.emplace(request); req_lock.unlock(); python_request_cv.notify_one(); @@ -543,7 +543,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { python_response_queue.pop(); res_lock.unlock(); dbg_default_trace("{}:{} request(type:{} seq:{}/{}) is responsed.", - __FILE__,__LINE__, request.type, request.sequence_num, response.sequence_num); + __FILE__,__LINE__, fmt::underlying(request.type), request.sequence_num, response.sequence_num); return response; } @@ -773,7 +773,7 @@ class PythonOCDPO: public DefaultOffCriticalDataPathObserver { ,message_id #endif ,blob_wrapper); - + Py_RETURN_NONE; } @@ -819,14 +819,14 @@ PyModuleDef PythonOCDPO::context_module = { nullptr, nullptr, nullptr, nullptr }; -/* - * This will only be called once +/* + * This will only be called once */ void initialize(ICascadeContext* ctxt) { PythonOCDPO::initialize(); } -/* +/* * This will be called for each UDL(PythonOCDPO) instance. */ std::shared_ptr get_observer(