Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/feeder/et_feeder.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
#include "protoio.hh"

namespace Chakra {
struct CompareNodes : public std::binary_function<
std::shared_ptr<ETFeederNode>,
std::shared_ptr<ETFeederNode>,
bool> {
struct CompareNodes {
bool operator()(
const std::shared_ptr<ETFeederNode> lhs,
const std::shared_ptr<ETFeederNode> rhs) const {
Expand Down
101 changes: 101 additions & 0 deletions src/feeder_v3/cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#ifndef CHAKRA_FEEDER_V3_CACHE_H
#define CHAKRA_FEEDER_V3_CACHE_H

#include <list>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>

namespace Chakra {
namespace FeederV3 {

template <typename K, typename V>
class Cache {
public:
Cache(size_t capacity) : capacity(capacity) {}
void put(const K& key, const V& value) {
std::unique_lock lock(cache_mutex);
if (this->cache.find(key) != this->cache.end()) {
// hit and update
this->lru.erase(this->cache[key].second);
this->lru.push_back(key);
this->cache[key].second = --this->lru.end();
this->cache[key].first = std::make_shared<V>(value);
} else {
// miss
while (this->cache.size() >= this->capacity) {
// evict
auto victim = this->lru.front();
this->cache.erase(victim);
this->lru.pop_front();
}
// and put new
this->lru.push_back(key);
this->cache[key] =
std::make_pair(std::make_shared<V>(value), --this->lru.end());
}
}
bool has(const K& key) {
std::shared_lock lock(cache_mutex);
return this->cache.find(key) != this->cache.end();
}
std::weak_ptr<const V> get(const K& key) {
std::shared_lock lock(cache_mutex);
if (this->cache.find(key) == this->cache.end()) {
throw std::runtime_error("Key not found in cache");
}
std::weak_ptr<const V> value(this->cache.at(key).first);
return value;
}
std::shared_ptr<const V> get_locked(const K& key) {
std::shared_lock lock(cache_mutex);
if (this->cache.find(key) == this->cache.end()) {
throw std::runtime_error("Key not found in cache");
}
return this->cache.at(key).first;
}
std::weak_ptr<const V> get_or_null(const K& key) {
std::shared_lock lock(cache_mutex);
if (this->cache.find(key) == this->cache.end()) {
return std::weak_ptr<V>();
}
std::weak_ptr<const V> value(this->cache.at(key).first);
return value;
}
std::shared_ptr<const V> get_or_null_locked(const K& key) {
std::shared_lock lock(cache_mutex);
if (this->cache.find(key) == this->cache.end()) {
return std::shared_ptr<const V>();
}
return this->cache.at(key).first;
}

void remove(const K& key) {
std::unique_lock lock(cache_mutex);
if (this->cache.find(key) == this->cache.end()) {
throw std::runtime_error("Key not found in cache");
}
this->lru.erase(this->cache[key].second);
this->cache.erase(key);
}

~Cache() {
this->cache.clear();
this->lru.clear();
}

private:
size_t capacity;
std::unordered_map<
K,
std::pair<std::shared_ptr<V>, typename std::list<K>::iterator>>
cache;
std::list<K> lru;
mutable std::shared_mutex cache_mutex;
};

} // namespace FeederV3
} // namespace Chakra

#endif
32 changes: 32 additions & 0 deletions src/feeder_v3/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef CHAKRA_FEEDER_V3_COMMON_H
#define CHAKRA_FEEDER_V3_COMMON_H
#include <cstdint>
#include "et_def.pb.h"

namespace Chakra {
namespace FeederV3 {
using NodeId = uint64_t;
using ETFeederId = uint64_t;
using ChakraNode = ChakraProtoMsg::Node;
using ChakraGlobalMetadata = ChakraProtoMsg::GlobalMetadata;
using ChakraAttr = ChakraProtoMsg::AttributeProto;

constexpr static bool ALLOW_IMPLICIT_INTEGER_CONVERSION = true;
constexpr static bool ALLOW_IMPLICIT_FLOAT_CONVERSION = true;
constexpr static bool ALLOW_IMPLICIT_INTEGER_TO_FLOAT_CONVERSION = true;
constexpr static bool ALLOW_IMPLICIT_FLOAT_TO_INTEGER_CONVERSION = false;
constexpr static bool NO_IMPLICIT_CONVERSION = false;
constexpr static bool DEFAULT_STRICT_TYPING = false;

constexpr static size_t DEFAULT_ETFEEDER_CACHE_SIZE = 16384;
constexpr static bool RESOLVE_DATA_DEPS = true;
constexpr static bool RESOLVE_CTRL_DEPS = false;

constexpr static size_t DEFAULT_PROTOBUF_BUFFER_SIZE = 16384;

constexpr static bool SOFT_SANITY_CHECK = true;

} // namespace FeederV3
} // namespace Chakra

#endif
233 changes: 233 additions & 0 deletions src/feeder_v3/dependancy_solver.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
#include "dependancy_solver.h"
#include <mutex>
#include <shared_mutex>
#include <stdexcept>
#include <unordered_map>
#include <unordered_set>

using namespace Chakra::FeederV3;

void _DependancyLayer::add_node(
const NodeId& node,
const std::unordered_set<NodeId>& parents) {
std::unique_lock<std::shared_mutex> lock(this->mutex);
this->dirty = true;
this->_helper_allocate_bucket(node);
for (auto& parent : parents) {
this->_helper_allocate_bucket(parent);
this->child_map_parent[node].insert(parent);
this->parent_map_child[parent].insert(node);
}
}

void _DependancyLayer::add_node_children(
const NodeId& node,
const std::unordered_set<NodeId>& children) {
std::unique_lock<std::shared_mutex> lock(this->mutex);
this->dirty = true;
this->_helper_allocate_bucket(node);
for (auto& child : children) {
this->_helper_allocate_bucket(child);
this->child_map_parent[child].insert(node);
this->parent_map_child[node].insert(child);
}
}

void _DependancyLayer::take_node(const NodeId& node) {
std::unique_lock<std::shared_mutex> lock(this->mutex);
if (this->dirty) {
throw std::runtime_error(
"dependancy layer is dirty, resolve_dependancy_free_nodes should be called first");
}
if (this->dependancy_free_nodes.find(node) ==
this->dependancy_free_nodes.end()) {
const auto& parents = this->child_map_parent[node];
throw std::runtime_error(
"Node " + std::to_string(node) +
" is not dependancy free or already taken/released");
}
if (this->ongoing_nodes.find(node) != this->ongoing_nodes.end()) {
throw std::runtime_error("Node is already taken");
}
this->ongoing_nodes.insert(node);
this->dependancy_free_nodes.erase(node);
}

void _DependancyLayer::finish_node(const NodeId& node) {
std::unique_lock<std::shared_mutex> lock(this->mutex);
if (this->dirty) {
throw std::runtime_error(
"dependancy layer is dirty, resolve_dependancy_free_nodes should be called first");
}
if (this->ongoing_nodes.find(node) == this->ongoing_nodes.end()) {
throw std::runtime_error("Node is not taken");
}
this->ongoing_nodes.erase(node);
for (auto& child : this->parent_map_child[node]) {
if (this->child_map_parent[child].find(node) ==
this->child_map_parent[child].end()) {
// This should not happen, but sanity check
throw std::runtime_error(
"Parent map child is not consistent with child map parent");
}
this->child_map_parent[child].erase(node);
if (this->child_map_parent[child].empty()) {
this->dependancy_free_nodes.insert(child);
}
}
this->child_map_parent.erase(node);
this->parent_map_child.erase(node);
}

void _DependancyLayer::push_back_node(const NodeId& node) {
std::unique_lock<std::shared_mutex> lock(this->mutex);
if (this->dirty) {
throw std::runtime_error(
"dependancy layer is dirty, resolve_dependancy_free_nodes should be called first");
}
if (this->ongoing_nodes.find(node) == this->ongoing_nodes.end()) {
throw std::runtime_error("Node is not taken");
}
this->ongoing_nodes.erase(node);
this->dependancy_free_nodes.insert(node);
}

void _DependancyLayer::resolve_dependancy_free_nodes() {
std::unique_lock<std::shared_mutex> lock(this->mutex);
if ((!this->dependancy_free_nodes.empty()) || (!this->ongoing_nodes.empty()))
throw std::runtime_error(
"resolve_dependancy_free_nodes after initialization is not supported yet!");
for (auto& it : this->child_map_parent) {
auto& node = it.first;
auto& parents = it.second;
if (parents.empty())
this->dependancy_free_nodes.insert(node);
}
if (this->dependancy_free_nodes.empty())
throw std::runtime_error(
"No dependancy free nodes found, there might be deadlocks");
this->dirty = false;
}

const std::unordered_set<NodeId>& _DependancyLayer::get_dependancy_free_nodes()
const {
return this->dependancy_free_nodes;
}

const std::unordered_set<NodeId>& _DependancyLayer::get_children(
NodeId node) const {
const auto& results = this->parent_map_child.at(node);
return this->parent_map_child.at(node);
}

const std::unordered_set<NodeId>& _DependancyLayer::get_parents(
NodeId node) const {
return this->child_map_parent.at(node);
}

const std::unordered_set<NodeId>& _DependancyLayer::get_ongoing_nodes() const {
return this->ongoing_nodes;
}

void _DependancyLayer::_helper_allocate_bucket(NodeId node_id) {
if (this->child_map_parent.find(node_id) == this->child_map_parent.end()) {
this->child_map_parent[node_id] = std::unordered_set<NodeId>();
}
if (this->parent_map_child.find(node_id) == this->parent_map_child.end()) {
this->parent_map_child[node_id] = std::unordered_set<NodeId>();
}
}

void DependancyResolver::add_node(const ChakraNode& node) {
NodeId node_id = node.id();
std::unordered_set<NodeId> parents, enabled_parents;
// hotfix: instead of ignroe, complete dont handle data dep if not enabled
if ((SOFT_SANITY_CHECK && this->enable_data_deps) || (!SOFT_SANITY_CHECK)) {
for (auto& parent : node.data_deps()) {
if (this->enable_data_deps)
enabled_parents.insert(parent);
parents.insert(parent);
}
this->data_dependancy.add_node(node_id, parents);
parents.clear();
}

// hotfix: instead of ignroe, complete dont handle data dep if not enabled
if ((SOFT_SANITY_CHECK && this->enable_ctrl_deps) || (!SOFT_SANITY_CHECK)) {
for (auto& parent : node.ctrl_deps()) {
if (this->enable_ctrl_deps)
enabled_parents.insert(parent);
parents.insert(parent);
}
this->ctrl_dependancy.add_node(node_id, parents);
parents.clear();
}

this->enabled_dependancy.add_node(node_id, enabled_parents);
}

void DependancyResolver::take_node(const NodeId& node) {
if ((SOFT_SANITY_CHECK && this->enable_data_deps) || (!SOFT_SANITY_CHECK))
this->data_dependancy.take_node(node);
if ((SOFT_SANITY_CHECK && this->enable_ctrl_deps) || (!SOFT_SANITY_CHECK))
this->ctrl_dependancy.take_node(node);
this->enabled_dependancy.take_node(node);
}

void DependancyResolver::push_back_node(const NodeId& node) {
if ((SOFT_SANITY_CHECK && this->enable_data_deps) || (!SOFT_SANITY_CHECK))
this->data_dependancy.push_back_node(node);
if ((SOFT_SANITY_CHECK && this->enable_ctrl_deps) || (!SOFT_SANITY_CHECK))
this->ctrl_dependancy.push_back_node(node);
this->enabled_dependancy.push_back_node(node);
}

void DependancyResolver::finish_node(const NodeId& node) {
if ((SOFT_SANITY_CHECK && this->enable_data_deps) || (!SOFT_SANITY_CHECK))
this->data_dependancy.finish_node(node);
if ((SOFT_SANITY_CHECK && this->enable_ctrl_deps) || (!SOFT_SANITY_CHECK))
this->ctrl_dependancy.finish_node(node);
this->enabled_dependancy.finish_node(node);
}

void DependancyResolver::resolve_dependancy_free_nodes() {
if ((SOFT_SANITY_CHECK && this->enable_data_deps) || (!SOFT_SANITY_CHECK))
this->data_dependancy.resolve_dependancy_free_nodes();
if ((SOFT_SANITY_CHECK && this->enable_ctrl_deps) || (!SOFT_SANITY_CHECK))
this->ctrl_dependancy.resolve_dependancy_free_nodes();
this->enabled_dependancy.resolve_dependancy_free_nodes();
}

const std::unordered_set<NodeId>& DependancyResolver::
get_dependancy_free_nodes() const {
return this->enabled_dependancy.get_dependancy_free_nodes();
}

const std::unordered_set<NodeId>& DependancyResolver::get_ongoing_nodes()
const {
return this->enabled_dependancy.get_ongoing_nodes();
}

const _DependancyLayer& DependancyResolver::get_data_dependancy() const {
return this->data_dependancy;
}

const _DependancyLayer& DependancyResolver::get_ctrl_dependancy() const {
return this->ctrl_dependancy;
}

const _DependancyLayer& DependancyResolver::get_enabled_dependancy() const {
return this->enabled_dependancy;
}

_DependancyLayer& DependancyResolver::get_data_dependancy_mut() {
return this->data_dependancy;
}

_DependancyLayer& DependancyResolver::get_ctrl_dependancy_mut() {
return this->ctrl_dependancy;
}

_DependancyLayer& DependancyResolver::get_enabled_dependancy_mut() {
return this->enabled_dependancy;
}
Loading