Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions phlex/app/load_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
#include "boost/dll/import.hpp"
#include "boost/json.hpp"

#include <cstdlib>
#include <filesystem>
#include <functional>
#include <stdexcept>
#include <string>

using namespace std::string_literals;
Expand All @@ -21,12 +24,13 @@ namespace phlex::experimental {
std::vector<std::function<detail::module_creator_t>> create_module;
std::function<detail::source_creator_t> create_source;

template <typename creator_t>
std::function<creator_t> plugin_loader(std::string const& spec, std::string const& symbol_name)
template <typename CreatorT>
std::function<CreatorT> plugin_loader(std::string const& spec, std::string const& symbol_name)
{
char const* plugin_path_ptr = std::getenv("PHLEX_PLUGIN_PATH");
if (!plugin_path_ptr)
if (!plugin_path_ptr) {
throw std::runtime_error("PHLEX_PLUGIN_PATH has not been set.");
}

using namespace boost;
std::vector<std::string> subdirs;
Expand All @@ -37,7 +41,7 @@ namespace phlex::experimental {
std::filesystem::path shared_library_path{subdir};
shared_library_path /= "lib" + spec + ".so";
if (exists(shared_library_path)) {
return dll::import_alias<creator_t>(shared_library_path, symbol_name);
return dll::import_alias<CreatorT>(shared_library_path, symbol_name);
}
}
throw std::runtime_error("Could not locate library with specification '"s + spec +
Expand Down
5 changes: 3 additions & 2 deletions phlex/configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
#include "boost/json.hpp"

#include <optional>
#include <utility>

namespace phlex::experimental {
class configuration {
public:
configuration() = default;
explicit configuration(boost::json::object const& config) : config_{config} {}
explicit configuration(boost::json::object config) : config_{std::move(config)} {}

template <typename T>
std::optional<T> get_if_present(std::string const& key) const
{
if (auto pkey = config_.if_contains(key)) {
if (auto const* pkey = config_.if_contains(key)) {
return value_to<T>(*pkey);
}
return std::nullopt;
Expand Down
5 changes: 5 additions & 0 deletions phlex/core/consumer.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#include "phlex/core/consumer.hpp"
#include "phlex/model/algorithm_name.hpp"

#include <string>
#include <utility>
#include <vector>

namespace phlex::experimental {
consumer::consumer(algorithm_name name, std::vector<std::string> predicates) :
Expand Down
44 changes: 24 additions & 20 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace phlex::experimental {
declared_fold(algorithm_name name,
std::vector<std::string> predicates,
specified_labels input_products);
virtual ~declared_fold();
~declared_fold() override;

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
Expand All @@ -55,10 +55,10 @@ namespace phlex::experimental {
class fold_node : public declared_fold, private count_stores {
using all_parameter_types = typename AlgorithmBits::input_parameter_types;
using input_parameter_types = skip_first_type<all_parameter_types>; // Skip fold object
static constexpr auto N = std::tuple_size_v<input_parameter_types>;
using R = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;
static constexpr auto number_inputs = std::tuple_size_v<input_parameter_types>;
using result_type = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;

static constexpr std::size_t M = 1; // hard-coded for now
static constexpr std::size_t number_output_products = 1; // hard-coded for now
using function_t = typename AlgorithmBits::bound_type;

public:
Expand All @@ -75,10 +75,11 @@ namespace phlex::experimental {
initializer_{std::move(initializer)},
output_{to_qualified_names(full_name(), std::move(output))},
partition_{std::move(partition)},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
join_{make_join_or_none(g, std::make_index_sequence<number_inputs>{})},
fold_{g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto& outputs) {
[this, ft = alg.release_algorithm()](messages_t<number_inputs> const& messages,
auto& outputs) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
Expand All @@ -104,13 +105,13 @@ namespace phlex::experimental {
if (store->is_flush()) {
counter_for(id_hash_for_counter).set_flush_value(store, original_message_id);
} else {
call(ft, messages, std::make_index_sequence<N>{});
call(ft, messages, std::make_index_sequence<number_inputs>{});
counter_for(id_hash_for_counter).increment(store->id()->level_hash());
}

if (auto counter = done_with(id_hash_for_counter)) {
auto parent = fold_store->make_continuation(this->full_name());
commit_(*parent);
commit(*parent);
++product_count_;
// FIXME: This msg.eom value may be wrong!
get<0>(outputs).try_put({parent, msg.eom, counter->original_message_id()});
Expand All @@ -123,17 +124,22 @@ namespace phlex::experimental {
private:
tbb::flow::receiver<message>& port_for(specified_label const& product_label) override
{
return receiver_for<N>(join_, input(), product_label);
return receiver_for<number_inputs>(join_, input(), product_label);
}

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<number_inputs>(join_);
}

tbb::flow::sender<message>& sender() override { return output_port<0ull>(fold_); }
tbb::flow::sender<message>& to_output() override { return sender(); }
qualified_names const& output() const override { return output_; }

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
void call(function_t const& ft,
messages_t<number_inputs> const& messages,
std::index_sequence<Is...>)
{
auto const& parent_id = *most_derived(messages).store->id()->parent(partition_);
// FIXME: Not the safest approach!
Expand All @@ -142,8 +148,7 @@ namespace phlex::experimental {
it =
results_
.insert({parent_id,
initialized_object(std::move(initializer_),
std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
initialized_object(std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
.first;
}
++calls_;
Expand All @@ -154,13 +159,12 @@ namespace phlex::experimental {
std::size_t product_count() const final { return product_count_.load(); }

template <size_t... Is>
auto initialized_object(InitTuple&& tuple, std::index_sequence<Is...>) const
auto initialized_object(std::index_sequence<Is...>) const
{
return std::unique_ptr<R>{
new R{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
return std::unique_ptr<result_type>{new result_type{std::get<Is>(initializer_)...}};
}

void commit_(product_store& store)
void commit(product_store& store)
{
auto& result = results_.at(*store.id());
if constexpr (requires { send(*result); }) {
Expand All @@ -177,9 +181,9 @@ namespace phlex::experimental {
input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
qualified_names output_;
std::string partition_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, messages_t<1>> fold_;
tbb::concurrent_unordered_map<level_id, std::unique_ptr<R>> results_;
join_or_none_t<number_inputs> join_;
tbb::flow::multifunction_node<messages_t<number_inputs>, messages_t<1>> fold_;
tbb::concurrent_unordered_map<level_id, std::unique_ptr<result_type>> results_;
std::atomic<std::size_t> calls_;
std::atomic<std::size_t> product_count_;
};
Expand Down
39 changes: 26 additions & 13 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace phlex::experimental {
declared_observer(algorithm_name name,
std::vector<std::string> predicates,
specified_labels input_products);
virtual ~declared_observer();
~declared_observer() override;

protected:
using hashes_t = tbb::concurrent_hash_map<level_id::hash_type, bool>;
Expand All @@ -51,9 +51,9 @@ namespace phlex::experimental {

template <typename AlgorithmBits>
class observer_node : public declared_observer, private detect_flush_flag {
using InputArgs = typename AlgorithmBits::input_parameter_types;
using input_args = typename AlgorithmBits::input_parameter_types;
using function_t = typename AlgorithmBits::bound_type;
static constexpr auto N = AlgorithmBits::number_inputs;
static constexpr auto number_inputs = AlgorithmBits::number_inputs;

public:
static constexpr auto number_output_products = 0;
Expand All @@ -66,17 +66,17 @@ namespace phlex::experimental {
AlgorithmBits alg,
specified_labels input_products) :
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
join_{make_join_or_none(g, std::make_index_sequence<number_inputs>{})},
observer_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<N> const& messages) -> oneapi::tbb::flow::continue_msg {
messages_t<number_inputs> const& messages) -> oneapi::tbb::flow::continue_msg {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(message_id);
} else if (accessor a; needs_new(store, a)) {
call(ft, messages, std::make_index_sequence<N>{});
call(ft, messages, std::make_index_sequence<number_inputs>{});
a->second = true;
flag_for(store->id()->hash()).mark_as_processed();
}
Expand All @@ -90,15 +90,26 @@ namespace phlex::experimental {
make_edge(join_, observer_);
}

~observer_node() { report_cached_hashes(cached_hashes_); }
~observer_node() override { report_cached_hashes(cached_hashes_); }

observer_node(observer_node const&) = default;
observer_node& operator=(observer_node const&) = default;

// NOLINTBEGIN(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor)
observer_node(observer_node&&) = default;
observer_node& operator=(observer_node&&) = default;
// NOLINTEND(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor)

private:
tbb::flow::receiver<message>& port_for(specified_label const& product_label) override
{
return receiver_for<N>(join_, input(), product_label);
return receiver_for<number_inputs>(join_, input(), product_label);
}

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<number_inputs>(join_);
}

bool needs_new(product_store_const_ptr const& store, accessor& a)
{
Expand All @@ -109,17 +120,19 @@ namespace phlex::experimental {
}

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
void call(function_t const& ft,
messages_t<number_inputs> const& messages,
std::index_sequence<Is...>)
{
++calls_;
return std::invoke(ft, std::get<Is>(input_).retrieve(messages)...);
}

std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>> observer_;
input_retriever_types<input_args> input_{input_arguments<input_args>()};
join_or_none_t<number_inputs> join_;
tbb::flow::function_node<messages_t<number_inputs>> observer_;
hashes_t cached_hashes_;
std::atomic<std::size_t> calls_;
};
Expand Down
Loading
Loading