diff --git a/Makefile.am b/Makefile.am index 2cdbc5d..c7b5e9c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -73,6 +73,7 @@ noinst_HEADERS+= node_prog/base_classes.h \ node_prog/reach_program.h \ node_prog/traverse_with_props.h \ node_prog/discover_paths.h \ + node_prog/discover_all_paths.h \ node_prog/get_btc_block.h \ common/cache_constants.h \ common/config_constants.h \ @@ -145,6 +146,7 @@ weaver_timestamper_SOURCES= common/comm_wrapper.cc \ node_prog/read_node_props_program.cc \ node_prog/traverse_with_props.cc \ node_prog/discover_paths.cc \ + node_prog/discover_all_paths.cc \ node_prog/get_btc_block.cc \ db/element.cc \ db/property.cc \ @@ -219,6 +221,7 @@ weaver_shard_SOURCES= common/ids.cc \ node_prog/read_node_props_program.cc \ node_prog/traverse_with_props.cc \ node_prog/discover_paths.cc \ + node_prog/discover_all_paths.cc \ node_prog/get_btc_block.cc \ db/hyper_stub.cc \ db/queue_manager.cc \ @@ -264,6 +267,7 @@ libweaverclient_la_SOURCES= common/ids.cc \ node_prog/two_neighborhood_program.cc \ node_prog/traverse_with_props.cc \ node_prog/discover_paths.cc \ + node_prog/discover_all_paths.cc \ node_prog/get_btc_block.cc \ db/element.cc \ db/property.cc \ diff --git a/bindings/python/client.pyx b/bindings/python/client.pyx index 2dd87b4..2e919c9 100644 --- a/bindings/python/client.pyx +++ b/bindings/python/client.pyx @@ -379,6 +379,16 @@ cdef extern from 'node_prog/discover_paths.h' namespace 'node_prog': remote_node prev_node node_handle_t src +cdef extern from 'node_prog/discover_all_paths.h' namespace 'node_prog': + cdef cppclass discover_all_paths_params: + discover_all_paths_params() + node_handle_t dest + uint32_t path_len + vector[prop_predicate] node_preds + vector[prop_predicate] edge_preds + unordered_map[string, vector[edge]] paths + remote_node prev_node + cdef extern from 'node_prog/get_btc_block.h' namespace 'node_prog': cdef cppclass get_btc_block_params: get_btc_block_params() @@ -426,6 +436,7 @@ cdef extern from 'client/client.h' namespace 'cl': weaver_client_returncode node_get_program(vector[pair[string, node_get_params]] &initial_args, node_get_params&) nogil weaver_client_returncode traverse_props_program(vector[pair[string, traverse_props_params]] &initial_args, traverse_props_params&) nogil weaver_client_returncode discover_paths_program(vector[pair[string, discover_paths_params]] &initial_args, discover_paths_params&) nogil + weaver_client_returncode discover_all_paths_program(vector[pair[string, discover_all_paths_params]] &initial_args, discover_all_paths_params&) nogil weaver_client_returncode get_btc_block_program(vector[pair[string, get_btc_block_params]] &initial_args, get_btc_block_params&) nogil weaver_client_returncode start_migration() weaver_client_returncode single_stream_migration() @@ -998,6 +1009,49 @@ cdef class Client: ret_paths[cur_node] = cur_edges inc(path_iter) return ret_paths + def discover_all_paths(self, start_node, end_node, path_len=None, node_preds=None, edge_preds=None): + cdef vector[pair[string, discover_all_paths_params]] c_args + cdef pair[string, discover_all_paths_params] arg_pair + arg_pair.first = start_node + arg_pair.second.prev_node = coordinator + arg_pair.second.dest = end_node + # arg_pair.second.src = start_node + if path_len is not None: + arg_pair.second.path_len = path_len + cdef prop_predicate pred_c + if node_preds is not None: + arg_pair.second.node_preds.reserve(len(node_preds)) + for pred in node_preds: + self.__convert_pred_to_c_pred(pred, pred_c) + arg_pair.second.node_preds.push_back(pred_c) + if edge_preds is not None: + arg_pair.second.edge_preds.reserve(len(edge_preds)) + for pred in edge_preds: + self.__convert_pred_to_c_pred(pred, pred_c) + arg_pair.second.edge_preds.push_back(pred_c) + c_args.push_back(arg_pair) + + cdef discover_all_paths_params c_rp + with nogil: + code = self.thisptr.discover_all_paths_program(c_args, c_rp) + + if code != WEAVER_CLIENT_SUCCESS: + raise WeaverError(code, 'node prog error') + + ret_paths = {} + cdef unordered_map[string, vector[edge]].iterator path_iter = c_rp.paths.begin() + cdef vector[edge].iterator edge_iter + while path_iter != c_rp.paths.end(): + cur_node = str(deref(path_iter).first) + cur_edges = [] + edge_iter = deref(path_iter).second.begin() + while edge_iter != deref(path_iter).second.end(): + cur_edges.append(Edge()) + self.__convert_edge_to_client_edge(deref(edge_iter), cur_edges[-1]) + inc(edge_iter) + ret_paths[cur_node] = cur_edges + inc(path_iter) + return ret_paths def get_btc_block(self, block): cdef vector[pair[string, get_btc_block_params]] c_args diff --git a/client/client.cc b/client/client.cc index 85cea36..49eeb81 100644 --- a/client/client.cc +++ b/client/client.cc @@ -526,6 +526,12 @@ client :: discover_paths_program(std::vector> &initial_args, node_prog::discover_all_paths_params &return_param) +{ + SPECIFIC_NODE_PROG(node_prog::DISCOVER_ALL_PATHS); +} + weaver_client_returncode client :: get_btc_block_program(std::vector> &initial_args, node_prog::get_btc_block_params &return_param) { diff --git a/client/client.h b/client/client.h index 1b62769..792b15f 100644 --- a/client/client.h +++ b/client/client.h @@ -37,6 +37,7 @@ #include "node_prog/node_get_program.h" #include "node_prog/traverse_with_props.h" #include "node_prog/discover_paths.h" +#include "node_prog/discover_all_paths.h" #include "node_prog/get_btc_block.h" namespace cl @@ -99,6 +100,7 @@ namespace cl weaver_client_returncode node_get_program(std::vector> &initial_args, node_prog::node_get_params&); weaver_client_returncode traverse_props_program(std::vector> &initial_args, node_prog::traverse_props_params&); weaver_client_returncode discover_paths_program(std::vector> &initial_args, node_prog::discover_paths_params&); + weaver_client_returncode discover_all_paths_program(std::vector> &initial_args, node_prog::discover_all_paths_params&); weaver_client_returncode get_btc_block_program(std::vector> &initial_args, node_prog::get_btc_block_params&); weaver_client_returncode start_migration(); diff --git a/node_prog/discover_all_paths.cc b/node_prog/discover_all_paths.cc new file mode 100644 index 0000000..3a5253a --- /dev/null +++ b/node_prog/discover_all_paths.cc @@ -0,0 +1,361 @@ +/* + * =============================================================== + * Description: Discover all paths between two vertices + * predicated on max path len, node properties, edge + * properties. (Note that the other `discover_paths` node + * program could ignore some paths while searching, + * although it offers better time complexity) + * + * Author: Ted Yin, ted.sybil@gmail.com + * + * Copyright (C) 2015, Cornell University, see the LICENSE file + * for licensing agreement + * =============================================================== + */ + +#define weaver_debug_ +#include "common/stl_serialization.h" +#include "node_prog/node_prog_type.h" +#include "node_prog/discover_all_paths.h" + +using node_prog::search_type; +using node_prog::discover_all_paths_params; +using node_prog::discover_all_paths_state; +using node_prog::discover_all_paths_substate; +using node_prog::cache_response; +using node_prog::Cache_Value_Base; + +// params +discover_all_paths_params :: discover_all_paths_params() + : path_len(UINT32_MAX) + , path_hash(0) + , prev_path_hash(0) + , returning(false) +{ } + +uint64_t +discover_all_paths_params :: size() const +{ + return message::size(dest) + + message::size(path_len) + + message::size(node_preds) + + message::size(edge_preds) + + message::size(ancestors) + + message::size(path_id) + + message::size(path_hash) + + message::size(prev_path_hash) + + message::size(paths) + + message::size(returning) + + message::size(prev_node) + ; +} + +void +discover_all_paths_params :: pack(e::packer &packer) const +{ + message::pack_buffer(packer, dest); + message::pack_buffer(packer, path_len); + message::pack_buffer(packer, node_preds); + message::pack_buffer(packer, edge_preds); + message::pack_buffer(packer, ancestors); + message::pack_buffer(packer, path_id); + message::pack_buffer(packer, path_hash); + message::pack_buffer(packer, prev_path_hash); + message::pack_buffer(packer, paths); + message::pack_buffer(packer, returning); + message::pack_buffer(packer, prev_node); +} + +void +discover_all_paths_params :: unpack(e::unpacker &unpacker) +{ + message::unpack_buffer(unpacker, dest); + message::unpack_buffer(unpacker, path_len); + message::unpack_buffer(unpacker, node_preds); + message::unpack_buffer(unpacker, edge_preds); + message::unpack_buffer(unpacker, ancestors); + message::unpack_buffer(unpacker, path_id); + message::unpack_buffer(unpacker, path_hash); + message::unpack_buffer(unpacker, prev_path_hash); + message::unpack_buffer(unpacker, paths); + message::unpack_buffer(unpacker, returning); + message::unpack_buffer(unpacker, prev_node); +} + +// state +discover_all_paths_substate :: discover_all_paths_substate() + : outstanding_count(0) +{ } + +uint64_t +discover_all_paths_substate :: size() const +{ + return message::size(outstanding_count) + + message::size(prev_node) + + message::size(paths) + + message::size(path_len) + + message::size(prev_path_hash) + + message::size(path_hash) + + message::size(path_id) + ; +} + +void +discover_all_paths_substate :: pack(e::packer &packer) const +{ + message::pack_buffer(packer, outstanding_count); + message::pack_buffer(packer, prev_node); + message::pack_buffer(packer, paths); + message::pack_buffer(packer, path_len); + message::pack_buffer(packer, prev_path_hash); + message::pack_buffer(packer, path_hash); + message::pack_buffer(packer, path_id); +} + +void +discover_all_paths_substate :: unpack(e::unpacker &unpacker) +{ + message::unpack_buffer(unpacker, outstanding_count); + message::unpack_buffer(unpacker, prev_node); + message::unpack_buffer(unpacker, paths); + message::unpack_buffer(unpacker, path_len); + message::unpack_buffer(unpacker, prev_path_hash); + message::unpack_buffer(unpacker, path_hash); + message::unpack_buffer(unpacker, path_id); +} + +void +discover_all_paths_substate::get_prev_substate_identifier(discover_all_paths_params ¶ms) +{ + params.path_hash = prev_path_hash; + /* copy path handle */ + params.path_id = path_id; + params.path_id.pop(); +} + +discover_all_paths_state :: discover_all_paths_state() +{ } + +uint64_t +discover_all_paths_state :: size() const +{ + return message::size(vmap); +} + +void +discover_all_paths_state :: pack(e::packer &packer) const +{ + message::pack_buffer(packer, vmap); +} + +void +discover_all_paths_state :: unpack(e::unpacker &unpacker) +{ + message::unpack_buffer(unpacker, vmap); +} + +static uint32_t incremental_bkdr_hash(uint32_t hv, const node_handle_t &node) +{ + static const uint32_t seed = 131; + for (char ch: node) + hv = hv * seed + ch; + hv = hv * seed + '\0'; + return hv; +} + +discover_all_paths_substate *discover_all_paths_state::\ + get_substate(const discover_all_paths_params ¶ms, + bool create = false) +{ + auto iter = vmap.find(params.path_hash); + if (create) + { + std::vector *substates; + if (iter == vmap.end()) + { + substates = &(vmap.emplace(params.path_hash, std::vector()).first->second); + } + else + substates = &(iter->second); + substates->emplace_back(); + discover_all_paths_substate *substate = &*substates->rbegin(); + return substate; + } + else + { + if (iter == vmap.end()) + return nullptr; + for (auto &substate: iter->second) { + if (substate.path_id == substate.path_id) + return &substate; + } + return nullptr; + } +} + +static void +_state_paths_to_params_paths(const std::unordered_map &state_paths, + std::unordered_map> ¶ms_paths) +{ + params_paths.clear(); + for (const auto &p: state_paths) { + std::vector &evec = params_paths[p.first]; + evec.reserve(p.second.size()); + for (const cl::edge &e: p.second) { + evec.emplace_back(e); + } + } +} + +void node_prog::path_handle::pop() { + if (!nodes.empty()) + { + nodes.pop_back(); + edges.pop_back(); + } +} + +uint64_t node_prog::path_handle::size() const { + return message::size(nodes) + message::size(edges); +} + +void node_prog::path_handle::pack(e::packer &packer) const { + message::pack_buffer(packer, nodes); + message::pack_buffer(packer, edges); +} + +void node_prog::path_handle::unpack(e::unpacker &unpacker) { + message::unpack_buffer(unpacker, nodes); + message::unpack_buffer(unpacker, edges); +} + +bool node_prog::path_handle::operator==(const path_handle &b) const { + if (nodes.size() != b.nodes.size()) + return false; + for (size_t i = 0; i < nodes.size(); i++) + if (nodes[i] != b.nodes[i] || edges[i] != b.edges[i]) + return false; + return true; +} + +std::pair>> +node_prog :: discover_all_paths_node_program(node_prog::node &n, + db::remote_node &rn, + discover_all_paths_params ¶ms, + std::function state_getter, + std::function, std::shared_ptr>, cache_key_t)>&, + cache_response*) +{ + discover_all_paths_state &state = state_getter(); + /* node progs to trigger next */ + std::vector> next; + node_handle_t cur_handle = n.get_handle(); + + if (!params.returning) { + // request spreading out + + discover_all_paths_substate *ret = state.get_substate(params); + if (ret != nullptr) { + /* node with the same substate already visited */ + assert(0 && "impossible"); + } else { + // visit this node now + discover_all_paths_substate &substate = *state.get_substate(params, true); + substate.path_len = params.path_len; + substate.path_hash = params.path_hash; + substate.prev_path_hash = params.prev_path_hash; + substate.path_id = params.path_id; + substate.prev_node = params.prev_node; + + if (!n.has_all_predicates(params.node_preds)) { + // node does not have all required properties, return immediately + params.returning = true; + substate.get_prev_substate_identifier(params); + assert(params.paths.empty()); + next.emplace_back(std::make_pair(params.prev_node, params)); + } else { + /* already reaches the target */ + if (params.dest == cur_handle || n.is_alias(params.dest)) { + params.returning = true; + substate.get_prev_substate_identifier(params); + substate.paths[cur_handle] = edge_set(); + _state_paths_to_params_paths(substate.paths, params.paths); + next.emplace_back(std::make_pair(params.prev_node, params)); + } else if (params.path_len) { + uint32_t prev_path_len = params.path_len; + discover_all_paths_params params0 = params; + params.prev_node = rn; + params.ancestors.emplace(cur_handle); + params.path_id.nodes.emplace_back(cur_handle); + params.prev_path_hash = params.path_hash; + double path_hash = incremental_bkdr_hash(params.path_hash, + cur_handle); + bool has_child = false; + + for (edge &e: n.get_edges()) { + const db::remote_node &nbr = e.get_neighbor(); + if (params.ancestors.find(nbr.handle) == params.ancestors.end() + && e.has_all_predicates(params.edge_preds)) { + params.path_len = prev_path_len - 1; + params.path_id.edges.emplace_back(e.get_handle()); + params.path_hash = incremental_bkdr_hash(path_hash, e.get_handle()); + next.emplace_back(std::make_pair(nbr, params)); + params.path_id.edges.pop_back(); /* revert to original edges */ + substate.outstanding_count++; + has_child = true; + } + } + + if (!has_child) { + params0.returning = true; + substate.get_prev_substate_identifier(params0); + next.emplace_back(std::make_pair(params0.prev_node, params0)); + } + } else { /* run out of path length */ + params.returning = true; + substate.get_prev_substate_identifier(params); + assert(params.paths.empty()); + next.emplace_back(std::make_pair(params.prev_node, params)); + } + } + } + + } else { + // request returning to start node + + discover_all_paths_substate *ret = state.get_substate(params); + assert(ret != nullptr); + discover_all_paths_substate &substate = *ret; + std::unordered_map new_paths; + auto &spaths = substate.paths; + auto &ppaths = params.paths; + /* merge results from child node */ + if (!params.paths.empty()) { + for (const auto &p: ppaths) { + if (spaths.find(p.first) == spaths.end()) + spaths.emplace(p.first, edge_set()); + edge_set &eset = spaths[p.first]; + for (const cl::edge &cl_e: p.second) + eset.emplace(cl_e); + } + /* add edges to children */ + edge_set &eset = spaths[cur_handle]; + for (edge &e: n.get_edges()) { + node_handle_t nbr = e.get_neighbor().handle; + if (e.has_all_predicates(params.edge_preds) && spaths.find(nbr) != spaths.end()) { + cl::edge cl_e; + e.get_client_edge(n.get_handle(), cl_e); + eset.emplace(cl_e); + } + } + } + + if (--substate.outstanding_count == 0) { + substate.get_prev_substate_identifier(params); + _state_paths_to_params_paths(spaths, ppaths); + next.emplace_back(std::make_pair(substate.prev_node, params)); + } + } + + return std::make_pair(search_type::BREADTH_FIRST, next); +} diff --git a/node_prog/discover_all_paths.h b/node_prog/discover_all_paths.h new file mode 100644 index 0000000..eded348 --- /dev/null +++ b/node_prog/discover_all_paths.h @@ -0,0 +1,126 @@ +/* + * =============================================================== + * Description: Discover all paths between two vertices + * predicated on max path len, node properties, edge + * properties. (Note that the other `discover_paths` node + * program could ignore some paths while searching, + * although it offers better time complexity) + * + * Author: Ted Yin, ted.sybil@gmail.com + * + * Copyright (C) 2015, Cornell University, see the LICENSE file + * for licensing agreement + * =============================================================== + */ + +#ifndef weaver_node_prog_discover_all_paths_h_ +#define weaver_node_prog_discover_all_paths_h_ + +#include + +#include "common/property_predicate.h" +#include "db/remote_node.h" +#include "node_prog/node.h" +#include "node_prog/edge.h" +#include "node_prog/base_classes.h" +#include "node_prog/cache_response.h" + +namespace node_prog +{ + using edge_set = std::unordered_set; + using anc_set = std::unordered_set; + + /* the identifier of paths (demonstrate how to create + * additional data type for storing local information + * for each node) */ + struct path_handle: public virtual Node_State_Base { + std::vector nodes; + std::vector edges; + + void pop(); + uint64_t size() const; + void pack(e::packer&) const; + void unpack(e::unpacker&); + bool operator==(const path_handle &b) const; + }; + + /* the parameters passed to a node */ + struct discover_all_paths_params: public virtual Node_Parameters_Base + { + node_handle_t dest; + /* the remaining path length */ + uint32_t path_len; + /* node & edge predicates */ + std::vector node_preds; + std::vector edge_preds; + /* for checking cycles */ + anc_set ancestors; + /* for identifying different path through a node */ + path_handle path_id; + uint32_t path_hash; + uint32_t prev_path_hash; + /* path results */ + std::unordered_map> paths; + /* whether the search is in returning phase */ + bool returning; + /* the node that propagates to the current node */ + db::remote_node prev_node; + + discover_all_paths_params(); + ~discover_all_paths_params() { } + uint64_t size() const; + void pack(e::packer&) const; + void unpack(e::unpacker&); + + // no caching + bool search_cache() { return false; } + cache_key_t cache_key() { return cache_key_t(); } + }; + + /* the substate describing a conceptual searching path */ + struct discover_all_paths_substate: public virtual Node_State_Base + { + uint32_t outstanding_count; + db::remote_node prev_node; + /* result accumulator */ + std::unordered_map paths; + uint32_t path_len; + uint32_t prev_path_hash; + uint32_t path_hash; + path_handle path_id; + + void get_prev_substate_identifier(discover_all_paths_params ¶ms); + discover_all_paths_substate(); + ~discover_all_paths_substate() { } + uint64_t size() const; + void pack(e::packer&) const; + void unpack(e::unpacker&); + }; + + /* overall state of a node */ + struct discover_all_paths_state: public virtual Node_State_Base + { + /* maps remaining path len to a set of substates, + * each of which represents a conceptual searching path */ + std::unordered_map> vmap; + + discover_all_paths_state(); + ~discover_all_paths_state() { } + uint64_t size() const; + void pack(e::packer&) const; + void unpack(e::unpacker&); + discover_all_paths_substate *get_substate(const discover_all_paths_params ¶ms, bool create); + }; + + std::pair>> + discover_all_paths_node_program(node &n, /* current node */ + db::remote_node &rn, /* current node for access remotely */ + discover_all_paths_params ¶ms, /* passed parameters */ + std::function state_getter, /* the function to get state */ + std::function, + std::shared_ptr>, cache_key_t)>&, + /* cache for each node */ + cache_response*); +} + +#endif diff --git a/node_prog/node_prog_type.h b/node_prog/node_prog_type.h index fc9a6c6..41957c5 100644 --- a/node_prog/node_prog_type.h +++ b/node_prog/node_prog_type.h @@ -44,6 +44,7 @@ namespace node_prog NODE_GET, TRAVERSE_PROPS, DISCOVER_PATHS, + DISCOVER_ALL_PATHS, GET_BTC_BLOCK, END }; diff --git a/node_prog/node_program.h b/node_prog/node_program.h index 4861fc7..b7a8a7b 100644 --- a/node_prog/node_program.h +++ b/node_prog/node_program.h @@ -41,6 +41,7 @@ #include "node_prog/two_neighborhood_program.h" #include "node_prog/traverse_with_props.h" #include "node_prog/discover_paths.h" +#include "node_prog/discover_all_paths.h" #include "node_prog/get_btc_block.h" namespace coordinator @@ -173,6 +174,9 @@ namespace node_prog new particular_node_program(TRAVERSE_PROPS, node_prog::traverse_props_node_program) }, { DISCOVER_PATHS, new particular_node_program(DISCOVER_PATHS, node_prog::discover_paths_node_program) }, + { DISCOVER_ALL_PATHS, + new particular_node_program(DISCOVER_PATHS, node_prog::discover_all_paths_node_program) }, + { GET_BTC_BLOCK, new particular_node_program(GET_BTC_BLOCK, node_prog::get_btc_block_node_program) }, };