diff --git a/phlex/app/load_module.cpp b/phlex/app/load_module.cpp index ffdbcfd8..4a6d3f11 100644 --- a/phlex/app/load_module.cpp +++ b/phlex/app/load_module.cpp @@ -8,7 +8,10 @@ #include "boost/dll/import.hpp" #include "boost/json.hpp" +#include +#include #include +#include #include using namespace std::string_literals; @@ -21,12 +24,13 @@ namespace phlex::experimental { std::vector> create_module; std::function create_source; - template - std::function plugin_loader(std::string const& spec, std::string const& symbol_name) + template + std::function plugin_loader(std::string const& spec, std::string const& symbol_name) { char const* plugin_path_ptr = std::getenv("PHLEX_PLUGIN_PATH"); - if (!plugin_path_ptr) + if (!plugin_path_ptr) { throw std::runtime_error("PHLEX_PLUGIN_PATH has not been set."); + } using namespace boost; std::vector subdirs; @@ -37,7 +41,7 @@ namespace phlex::experimental { std::filesystem::path shared_library_path{subdir}; shared_library_path /= "lib" + spec + ".so"; if (exists(shared_library_path)) { - return dll::import_alias(shared_library_path, symbol_name); + return dll::import_alias(shared_library_path, symbol_name); } } throw std::runtime_error("Could not locate library with specification '"s + spec + diff --git a/phlex/configuration.hpp b/phlex/configuration.hpp index 30f4406f..19658c10 100644 --- a/phlex/configuration.hpp +++ b/phlex/configuration.hpp @@ -4,17 +4,18 @@ #include "boost/json.hpp" #include +#include namespace phlex::experimental { class configuration { public: configuration() = default; - explicit configuration(boost::json::object const& config) : config_{config} {} + explicit configuration(boost::json::object config) : config_{std::move(config)} {} template std::optional get_if_present(std::string const& key) const { - if (auto pkey = config_.if_contains(key)) { + if (auto const* pkey = config_.if_contains(key)) { return value_to(*pkey); } return std::nullopt; diff --git a/phlex/core/consumer.cpp b/phlex/core/consumer.cpp index f02a89d5..8e57506d 100644 --- a/phlex/core/consumer.cpp +++ b/phlex/core/consumer.cpp @@ -1,4 +1,9 @@ #include "phlex/core/consumer.hpp" +#include "phlex/model/algorithm_name.hpp" + +#include +#include +#include namespace phlex::experimental { consumer::consumer(algorithm_name name, std::vector predicates) : diff --git a/phlex/core/declared_fold.hpp b/phlex/core/declared_fold.hpp index 2295ea71..a670cf57 100644 --- a/phlex/core/declared_fold.hpp +++ b/phlex/core/declared_fold.hpp @@ -38,7 +38,7 @@ namespace phlex::experimental { declared_fold(algorithm_name name, std::vector predicates, specified_labels input_products); - virtual ~declared_fold(); + ~declared_fold() override; virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; @@ -55,10 +55,10 @@ namespace phlex::experimental { class fold_node : public declared_fold, private count_stores { using all_parameter_types = typename AlgorithmBits::input_parameter_types; using input_parameter_types = skip_first_type; // Skip fold object - static constexpr auto N = std::tuple_size_v; - using R = std::decay_t>; + static constexpr auto number_inputs = std::tuple_size_v; + using result_type = std::decay_t>; - static constexpr std::size_t M = 1; // hard-coded for now + static constexpr std::size_t number_output_products = 1; // hard-coded for now using function_t = typename AlgorithmBits::bound_type; public: @@ -75,10 +75,11 @@ namespace phlex::experimental { initializer_{std::move(initializer)}, output_{to_qualified_names(full_name(), std::move(output))}, partition_{std::move(partition)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, std::make_index_sequence{})}, fold_{g, concurrency, - [this, ft = alg.release_algorithm()](messages_t const& messages, auto& outputs) { + [this, ft = alg.release_algorithm()](messages_t const& messages, + auto& outputs) { // N.B. The assumption is that a fold will *never* need to cache // the product store it creates. Any flush messages *do not* need // to be propagated to downstream nodes. @@ -104,13 +105,13 @@ namespace phlex::experimental { if (store->is_flush()) { counter_for(id_hash_for_counter).set_flush_value(store, original_message_id); } else { - call(ft, messages, std::make_index_sequence{}); + call(ft, messages, std::make_index_sequence{}); counter_for(id_hash_for_counter).increment(store->id()->level_hash()); } if (auto counter = done_with(id_hash_for_counter)) { auto parent = fold_store->make_continuation(this->full_name()); - commit_(*parent); + commit(*parent); ++product_count_; // FIXME: This msg.eom value may be wrong! get<0>(outputs).try_put({parent, msg.eom, counter->original_message_id()}); @@ -123,17 +124,22 @@ namespace phlex::experimental { private: tbb::flow::receiver& port_for(specified_label const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label); } - std::vector*> ports() override { return input_ports(join_); } + std::vector*> ports() override + { + return input_ports(join_); + } tbb::flow::sender& sender() override { return output_port<0ull>(fold_); } tbb::flow::sender& to_output() override { return sender(); } qualified_names const& output() const override { return output_; } template - void call(function_t const& ft, messages_t const& messages, std::index_sequence) + void call(function_t const& ft, + messages_t const& messages, + std::index_sequence) { auto const& parent_id = *most_derived(messages).store->id()->parent(partition_); // FIXME: Not the safest approach! @@ -142,8 +148,7 @@ namespace phlex::experimental { it = results_ .insert({parent_id, - initialized_object(std::move(initializer_), - std::make_index_sequence>{})}) + initialized_object(std::make_index_sequence>{})}) .first; } ++calls_; @@ -154,13 +159,12 @@ namespace phlex::experimental { std::size_t product_count() const final { return product_count_.load(); } template - auto initialized_object(InitTuple&& tuple, std::index_sequence) const + auto initialized_object(std::index_sequence) const { - return std::unique_ptr{ - new R{std::forward>(std::get(tuple))...}}; + return std::unique_ptr{new result_type{std::get(initializer_)...}}; } - void commit_(product_store& store) + void commit(product_store& store) { auto& result = results_.at(*store.id()); if constexpr (requires { send(*result); }) { @@ -177,9 +181,9 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; qualified_names output_; std::string partition_; - join_or_none_t join_; - tbb::flow::multifunction_node, messages_t<1>> fold_; - tbb::concurrent_unordered_map> results_; + join_or_none_t join_; + tbb::flow::multifunction_node, messages_t<1>> fold_; + tbb::concurrent_unordered_map> results_; std::atomic calls_; std::atomic product_count_; }; diff --git a/phlex/core/declared_observer.hpp b/phlex/core/declared_observer.hpp index 5488507c..a1b642d1 100644 --- a/phlex/core/declared_observer.hpp +++ b/phlex/core/declared_observer.hpp @@ -35,7 +35,7 @@ namespace phlex::experimental { declared_observer(algorithm_name name, std::vector predicates, specified_labels input_products); - virtual ~declared_observer(); + ~declared_observer() override; protected: using hashes_t = tbb::concurrent_hash_map; @@ -51,9 +51,9 @@ namespace phlex::experimental { template class observer_node : public declared_observer, private detect_flush_flag { - using InputArgs = typename AlgorithmBits::input_parameter_types; + using input_args = typename AlgorithmBits::input_parameter_types; using function_t = typename AlgorithmBits::bound_type; - static constexpr auto N = AlgorithmBits::number_inputs; + static constexpr auto number_inputs = AlgorithmBits::number_inputs; public: static constexpr auto number_output_products = 0; @@ -66,17 +66,17 @@ namespace phlex::experimental { AlgorithmBits alg, specified_labels input_products) : declared_observer{std::move(name), std::move(predicates), std::move(input_products)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, std::make_index_sequence{})}, observer_{g, concurrency, [this, ft = alg.release_algorithm()]( - messages_t const& messages) -> oneapi::tbb::flow::continue_msg { + messages_t const& messages) -> oneapi::tbb::flow::continue_msg { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); if (store->is_flush()) { flag_for(store->id()->hash()).flush_received(message_id); } else if (accessor a; needs_new(store, a)) { - call(ft, messages, std::make_index_sequence{}); + call(ft, messages, std::make_index_sequence{}); a->second = true; flag_for(store->id()->hash()).mark_as_processed(); } @@ -90,15 +90,26 @@ namespace phlex::experimental { make_edge(join_, observer_); } - ~observer_node() { report_cached_hashes(cached_hashes_); } + ~observer_node() override { report_cached_hashes(cached_hashes_); } + + observer_node(observer_node const&) = default; + observer_node& operator=(observer_node const&) = default; + + // NOLINTBEGIN(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) + observer_node(observer_node&&) = default; + observer_node& operator=(observer_node&&) = default; + // NOLINTEND(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) private: tbb::flow::receiver& port_for(specified_label const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label); } - std::vector*> ports() override { return input_ports(join_); } + std::vector*> ports() override + { + return input_ports(join_); + } bool needs_new(product_store_const_ptr const& store, accessor& a) { @@ -109,7 +120,9 @@ namespace phlex::experimental { } template - void call(function_t const& ft, messages_t const& messages, std::index_sequence) + void call(function_t const& ft, + messages_t const& messages, + std::index_sequence) { ++calls_; return std::invoke(ft, std::get(input_).retrieve(messages)...); @@ -117,9 +130,9 @@ namespace phlex::experimental { std::size_t num_calls() const final { return calls_.load(); } - input_retriever_types input_{input_arguments()}; - join_or_none_t join_; - tbb::flow::function_node> observer_; + input_retriever_types input_{input_arguments()}; + join_or_none_t join_; + tbb::flow::function_node> observer_; hashes_t cached_hashes_; std::atomic calls_; }; diff --git a/phlex/core/declared_predicate.hpp b/phlex/core/declared_predicate.hpp index d8e6a0c0..2f721440 100644 --- a/phlex/core/declared_predicate.hpp +++ b/phlex/core/declared_predicate.hpp @@ -38,7 +38,7 @@ namespace phlex::experimental { declared_predicate(algorithm_name name, std::vector predicates, specified_labels input_products); - virtual ~declared_predicate(); + ~declared_predicate() override; virtual tbb::flow::sender& sender() = 0; @@ -57,9 +57,9 @@ namespace phlex::experimental { template class predicate_node : public declared_predicate, private detect_flush_flag { - using InputArgs = typename AlgorithmBits::input_parameter_types; + using input_args = typename AlgorithmBits::input_parameter_types; using function_t = typename AlgorithmBits::bound_type; - static constexpr auto N = AlgorithmBits::number_inputs; + static constexpr auto number_inputs = AlgorithmBits::number_inputs; public: static constexpr auto number_output_products = 0ull; @@ -72,47 +72,60 @@ namespace phlex::experimental { AlgorithmBits alg, specified_labels input_products) : declared_predicate{std::move(name), std::move(predicates), std::move(input_products)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, - predicate_{ - g, - concurrency, - [this, ft = alg.release_algorithm()](messages_t const& messages) -> predicate_result { - auto const& msg = most_derived(messages); - auto const& [store, message_id] = std::tie(msg.store, msg.id); - predicate_result result{}; - if (store->is_flush()) { - flag_for(store->id()->hash()).flush_received(message_id); - } else if (const_accessor a; results_.find(a, store->id()->hash())) { - result = {msg.eom, message_id, a->second.result}; - } else if (accessor a; results_.insert(a, store->id()->hash())) { - bool const rc = call(ft, messages, std::make_index_sequence{}); - result = a->second = {msg.eom, message_id, rc}; - flag_for(store->id()->hash()).mark_as_processed(); - } - - if (done_with(store)) { - results_.erase(store->id()->hash()); - } - return result; - }} + join_{make_join_or_none(g, std::make_index_sequence{})}, + predicate_{g, + concurrency, + [this, ft = alg.release_algorithm()]( + messages_t const& messages) -> predicate_result { + auto const& msg = most_derived(messages); + auto const& [store, message_id] = std::tie(msg.store, msg.id); + predicate_result result{}; + if (store->is_flush()) { + flag_for(store->id()->hash()).flush_received(message_id); + } else if (const_accessor a; results_.find(a, store->id()->hash())) { + result = {msg.eom, message_id, a->second.result}; + } else if (accessor a; results_.insert(a, store->id()->hash())) { + bool const rc = call(ft, messages, std::make_index_sequence{}); + result = a->second = {msg.eom, message_id, rc}; + flag_for(store->id()->hash()).mark_as_processed(); + } + + if (done_with(store)) { + results_.erase(store->id()->hash()); + } + return result; + }} { make_edge(join_, predicate_); } - ~predicate_node() { report_cached_results(results_); } + ~predicate_node() override { report_cached_results(results_); } + + predicate_node(predicate_node const&) = default; + predicate_node& operator=(predicate_node const&) = default; + + // NOLINTBEGIN(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) + predicate_node(predicate_node&&) = default; + predicate_node& operator=(predicate_node&&) = default; + // NOLINTEND(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) private: tbb::flow::receiver& port_for(specified_label const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label); } - std::vector*> ports() override { return input_ports(join_); } + std::vector*> ports() override + { + return input_ports(join_); + } tbb::flow::sender& sender() override { return predicate_; } template - bool call(function_t const& ft, messages_t const& messages, std::index_sequence) + bool call(function_t const& ft, + messages_t const& messages, + std::index_sequence) { ++calls_; return std::invoke(ft, std::get(input_).retrieve(messages)...); @@ -120,9 +133,9 @@ namespace phlex::experimental { std::size_t num_calls() const final { return calls_.load(); } - input_retriever_types input_{input_arguments()}; - join_or_none_t join_; - tbb::flow::function_node, predicate_result> predicate_; + input_retriever_types input_{input_arguments()}; + join_or_none_t join_; + tbb::flow::function_node, predicate_result> predicate_; results_t results_; std::atomic calls_; }; diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index daa654de..08500044 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -42,7 +42,7 @@ namespace phlex::experimental { declared_transform(algorithm_name name, std::vector predicates, specified_labels input_products); - virtual ~declared_transform(); + ~declared_transform() override; virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; @@ -67,12 +67,11 @@ namespace phlex::experimental { using function_t = typename AlgorithmBits::bound_type; using input_parameter_types = typename AlgorithmBits::input_parameter_types; - static constexpr auto N = AlgorithmBits::number_inputs; - static constexpr auto M = number_output_objects; + static constexpr auto number_inputs = AlgorithmBits::number_inputs; public: using node_ptr_type = declared_transform_ptr; - static constexpr auto number_output_products = M; + static constexpr auto number_output_products = number_output_objects; transform_node(algorithm_name name, std::size_t concurrency, @@ -83,10 +82,11 @@ namespace phlex::experimental { std::vector output) : declared_transform{std::move(name), std::move(predicates), std::move(input_products)}, output_{to_qualified_names(full_name(), std::move(output))}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, std::make_index_sequence{})}, transform_{g, concurrency, - [this, ft = alg.release_algorithm()](messages_t const& messages, auto& output) { + [this, ft = alg.release_algorithm()](messages_t const& messages, + auto& output) { auto const& msg = most_derived(messages); auto const& [store, message_eom, message_id] = std::tie(msg.store, msg.eom, msg.id); @@ -98,7 +98,7 @@ namespace phlex::experimental { } else { accessor a; if (stores_.insert(a, store->id()->hash())) { - auto result = call(ft, messages, std::make_index_sequence{}); + auto result = call(ft, messages, std::make_index_sequence{}); ++calls_; ++product_count_[store->id()->level_hash()]; products new_products; @@ -123,22 +123,35 @@ namespace phlex::experimental { make_edge(join_, transform_); } - ~transform_node() { report_cached_stores(stores_); } + ~transform_node() override { report_cached_stores(stores_); } + + transform_node(transform_node const&) = default; + transform_node& operator=(transform_node const&) = default; + + // NOLINTBEGIN(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) + transform_node(transform_node&&) = default; + transform_node& operator=(transform_node&&) = default; + // NOLINTEND(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) private: tbb::flow::receiver& port_for(specified_label const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label); } - std::vector*> ports() override { return input_ports(join_); } + std::vector*> ports() override + { + return input_ports(join_); + } tbb::flow::sender& sender() override { return output_port<0>(transform_); } tbb::flow::sender& to_output() override { return output_port<1>(transform_); } qualified_names const& output() const override { return output_; } template - auto call(function_t const& ft, messages_t const& messages, std::index_sequence) + auto call(function_t const& ft, + messages_t const& messages, + std::index_sequence) { return std::invoke(ft, std::get(input_).retrieve(messages)...); } @@ -155,8 +168,8 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; qualified_names output_; - join_or_none_t join_; - tbb::flow::multifunction_node, messages_t<2u>> transform_; + join_or_none_t join_; + tbb::flow::multifunction_node, messages_t<2u>> transform_; stores_t stores_; std::atomic calls_; tbb::concurrent_unordered_map> product_count_; diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 7b0477ba..f793f7a1 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -9,7 +9,6 @@ #include "phlex/core/products_consumer.hpp" #include "phlex/core/store_counters.hpp" #include "phlex/model/algorithm_name.hpp" -#include "phlex/model/handle.hpp" #include "phlex/model/level_id.hpp" #include "phlex/model/product_store.hpp" #include "phlex/model/qualified_name.hpp" @@ -27,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -58,7 +58,7 @@ namespace phlex::experimental { declared_unfold(algorithm_name name, std::vector predicates, specified_labels input_products); - virtual ~declared_unfold(); + ~declared_unfold() override; virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; @@ -80,9 +80,8 @@ namespace phlex::experimental { template class unfold_node : public declared_unfold, private detect_flush_flag { - using InputArgs = constructor_parameter_types; - static constexpr std::size_t N = std::tuple_size_v; - static constexpr std::size_t M = number_output_objects; + using input_args = constructor_parameter_types; + static constexpr std::size_t number_inputs = std::tuple_size_v; public: unfold_node(algorithm_name name, @@ -97,43 +96,60 @@ namespace phlex::experimental { declared_unfold{std::move(name), std::move(predicates), std::move(product_labels)}, output_{to_qualified_names(full_name(), std::move(output_products))}, new_level_name_{std::move(new_level_name)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, - unfold_{ - g, - concurrency, - [this, p = std::move(predicate), ufold = std::move(unfold)](messages_t const& messages, - auto& output) { - auto const& msg = most_derived(messages); - auto const& store = msg.store; - if (store->is_flush()) { - flag_for(store->id()->hash()).flush_received(msg.id); - std::get<0>(output).try_put(msg); - } else if (accessor a; stores_.insert(a, store->id()->hash())) { - std::size_t const original_message_id{msg_counter_}; - generator g{msg.store, this->full_name(), new_level_name_}; - call(p, ufold, msg.store->id(), g, msg.eom, messages, std::make_index_sequence{}); - - message const flush_msg{g.flush_store(), msg.eom, ++msg_counter_, original_message_id}; - std::get<0>(output).try_put(flush_msg); - flag_for(store->id()->hash()).mark_as_processed(); - } - - if (done_with(store)) { - stores_.erase(store->id()->hash()); - } - }} + join_{make_join_or_none(g, std::make_index_sequence{})}, + unfold_{g, + concurrency, + [this, p = std::move(predicate), ufold = std::move(unfold)]( + messages_t const& messages, auto& output) { + auto const& msg = most_derived(messages); + auto const& store = msg.store; + if (store->is_flush()) { + flag_for(store->id()->hash()).flush_received(msg.id); + std::get<0>(output).try_put(msg); + } else if (accessor a; stores_.insert(a, store->id()->hash())) { + std::size_t const original_message_id{msg_counter_}; + generator g{msg.store, this->full_name(), new_level_name_}; + call(p, + ufold, + msg.store->id(), + g, + msg.eom, + messages, + std::make_index_sequence{}); + + message const flush_msg{ + g.flush_store(), msg.eom, ++msg_counter_, original_message_id}; + std::get<0>(output).try_put(flush_msg); + flag_for(store->id()->hash()).mark_as_processed(); + } + + if (done_with(store)) { + stores_.erase(store->id()->hash()); + } + }} { make_edge(join_, unfold_); } - ~unfold_node() { report_cached_stores(stores_); } + ~unfold_node() override { report_cached_stores(stores_); } + + unfold_node(unfold_node const&) = default; + unfold_node& operator=(unfold_node const&) = default; + + // NOLINTBEGIN(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) + unfold_node(unfold_node&&) = default; + unfold_node& operator=(unfold_node&&) = default; + // NOLINTEND(cppcoreguidelines-noexcept-move-operations,performance-noexcept-move-constructor) private: tbb::flow::receiver& port_for(specified_label const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label); + } + std::vector*> ports() override + { + return input_ports(join_); } - std::vector*> ports() override { return input_ports(join_); } tbb::flow::sender& sender() override { return output_port<0>(unfold_); } tbb::flow::sender& to_output() override { return sender(); } @@ -145,7 +161,7 @@ namespace phlex::experimental { level_id_ptr const& unfolded_id, generator& g, end_of_message_ptr const& eom, - messages_t const& messages, + messages_t const& messages, std::index_sequence) { ++calls_; @@ -166,7 +182,8 @@ namespace phlex::experimental { } ++product_count_; auto child = g.make_child_for(counter++, std::move(new_products)); - message const child_msg{child, eom->make_child(child->id()), ++msg_counter_}; + message const child_msg{ + .store = child, .eom = eom->make_child(child->id()), .id = ++msg_counter_}; output_port<0>(unfold_).try_put(child_msg); } } @@ -174,15 +191,15 @@ namespace phlex::experimental { std::size_t num_calls() const final { return calls_.load(); } std::size_t product_count() const final { return product_count_.load(); } - input_retriever_types input_{input_arguments()}; + input_retriever_types input_{input_arguments()}; qualified_names output_; std::string new_level_name_; - join_or_none_t join_; - tbb::flow::multifunction_node, messages_t<1u>> unfold_; + join_or_none_t join_; + tbb::flow::multifunction_node, messages_t<1u>> unfold_; tbb::concurrent_hash_map stores_; - std::atomic msg_counter_{}; // Is this sufficient? Probably not. - std::atomic calls_{}; - std::atomic product_count_{}; + std::atomic msg_counter_; // Is this sufficient? Probably not. + std::atomic calls_; + std::atomic product_count_; }; } diff --git a/phlex/core/detail/filter_impl.cpp b/phlex/core/detail/filter_impl.cpp index 4c576e23..15560ab0 100644 --- a/phlex/core/detail/filter_impl.cpp +++ b/phlex/core/detail/filter_impl.cpp @@ -1,7 +1,11 @@ #include "phlex/core/detail/filter_impl.hpp" +#include "phlex/core/specified_label.hpp" +#include "phlex/model/product_store.hpp" #include +#include #include +#include namespace { phlex::experimental::specified_label const output_dummy{phlex::experimental::qualified_name{ @@ -75,8 +79,9 @@ namespace phlex::experimental { // Fill slots in the order of the input arguments to the downstream node. for (std::size_t i = 0; i != nargs_; ++i) { - if (elem[i] or not store->contains_product((*product_names_)[i].name.full())) + if (elem[i] or not store->contains_product((*product_names_)[i].name.full())) { continue; + } elem[i] = store; } } diff --git a/phlex/core/detail/filter_impl.hpp b/phlex/core/detail/filter_impl.hpp index 2749a938..44f8d3d3 100644 --- a/phlex/core/detail/filter_impl.hpp +++ b/phlex/core/detail/filter_impl.hpp @@ -12,8 +12,8 @@ namespace phlex::experimental { struct predicate_result { end_of_message_ptr eom; - std::size_t msg_id; - bool result; + std::size_t msg_id{}; + bool result{false}; }; inline constexpr unsigned int true_value{-1u}; @@ -44,6 +44,7 @@ namespace phlex::experimental { bool claim(accessor& a, std::size_t msg_id); private: + // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) unsigned int const total_decisions_; decisions_t results_; }; @@ -58,10 +59,10 @@ namespace phlex::experimental { explicit data_map(for_output_t); explicit data_map(specified_labels const& product_names); - bool is_complete(std::size_t const msg_id) const; + bool is_complete(std::size_t msg_id) const; - void update(std::size_t const msg_id, product_store_const_ptr const& store); - std::vector release_data(std::size_t const msg_id); + void update(std::size_t msg_id, product_store_const_ptr const& store); + std::vector release_data(std::size_t msg_id); private: stores_t stores_; diff --git a/phlex/core/end_of_message.cpp b/phlex/core/end_of_message.cpp index 89a1ec73..b5c2867f 100644 --- a/phlex/core/end_of_message.cpp +++ b/phlex/core/end_of_message.cpp @@ -1,4 +1,5 @@ #include "phlex/core/end_of_message.hpp" +#include "phlex/core/fwd.hpp" #include "phlex/model/level_hierarchy.hpp" namespace phlex::experimental { diff --git a/phlex/core/end_of_message.hpp b/phlex/core/end_of_message.hpp index 9082c621..1684aa26 100644 --- a/phlex/core/end_of_message.hpp +++ b/phlex/core/end_of_message.hpp @@ -14,6 +14,13 @@ namespace phlex::experimental { end_of_message_ptr make_child(level_id_ptr id); ~end_of_message(); + // Default copy/move constructors and assignment operators are adequate + end_of_message(end_of_message const&) = default; + end_of_message(end_of_message&&) = default; + + end_of_message& operator=(end_of_message const&) = default; + end_of_message& operator=(end_of_message&&) = default; + private: end_of_message(end_of_message_ptr parent, level_hierarchy* hierarchy, level_id_ptr id); diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 64520562..a2ccde09 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -11,6 +11,7 @@ #include #include +#include namespace phlex::experimental { level_sentry::level_sentry(flush_counters& counters, @@ -40,9 +41,9 @@ namespace phlex::experimental { } // FIXME: The algorithm below should support user-specified flush stores. - framework_graph::framework_graph(detail::next_store_t next_store, int const max_parallelism) : + framework_graph::framework_graph(detail::next_store_t f, int const max_parallelism) : parallelism_limit_{static_cast(max_parallelism)}, - driver_{std::move(next_store)}, + driver_{std::move(f)}, src_{graph_, [this](tbb::flow_control& fc) mutable -> message { auto item = driver_(); diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index 18aa00cc..de4c6ddd 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -39,8 +39,10 @@ namespace phlex::experimental { std::size_t depth() const noexcept; private: + // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) flush_counters& counters_; message_sender& sender_; + // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) product_store_ptr store_; std::size_t depth_; }; @@ -131,15 +133,15 @@ namespace phlex::experimental { void drain(); std::size_t original_message_id(product_store_ptr const& store); - resource_usage graph_resource_usage_{}; + resource_usage graph_resource_usage_; max_allowed_parallelism parallelism_limit_; - level_hierarchy hierarchy_{}; + level_hierarchy hierarchy_; node_catalog nodes_{}; - std::map filters_{}; + std::map filters_; // The graph_ object uses the filters_, nodes_, and hierarchy_ objects implicitly. - tbb::flow::graph graph_{}; + tbb::flow::graph graph_; framework_driver driver_; - std::vector registration_errors_{}; + std::vector registration_errors_; tbb::flow::input_node src_; multiplexer multiplexer_; std::stack eoms_; diff --git a/phlex/core/glue.hpp b/phlex/core/glue.hpp index 7f0cfdbb..8941f0f3 100644 --- a/phlex/core/glue.hpp +++ b/phlex/core/glue.hpp @@ -36,7 +36,7 @@ namespace phlex::experimental { std::shared_ptr bound_obj, std::vector& errors, configuration const* config = nullptr) : - graph_{g}, nodes_{nodes}, bound_obj_{std::move(bound_obj)}, errors_{errors}, config_{config} + graph_{g}, nodes_{nodes}, errors_{errors}, bound_obj_{std::move(bound_obj)}, config_{config} { } @@ -135,10 +135,12 @@ namespace phlex::experimental { } private: + // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) tbb::flow::graph& graph_; node_catalog& nodes_; - std::shared_ptr bound_obj_; std::vector& errors_; + // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) + std::shared_ptr bound_obj_; configuration const* config_; }; } diff --git a/phlex/core/graph_proxy.hpp b/phlex/core/graph_proxy.hpp index 8e5239d8..6584d2ad 100644 --- a/phlex/core/graph_proxy.hpp +++ b/phlex/core/graph_proxy.hpp @@ -32,7 +32,7 @@ namespace phlex::experimental { node_catalog& nodes, std::vector& errors) requires(std::same_as) - : config_{&config}, graph_{g}, nodes_{nodes}, errors_{errors} + : graph_{g}, nodes_{nodes}, errors_{errors}, config_{&config} { } @@ -99,7 +99,7 @@ namespace phlex::experimental { std::shared_ptr bound_obj, std::vector& errors) requires(not std::same_as) - : config_{config}, graph_{g}, nodes_{nodes}, bound_obj_{bound_obj}, errors_{errors} + : graph_{g}, nodes_{nodes}, errors_{errors}, bound_obj_{std::move(bound_obj)}, config_{config} { } @@ -108,11 +108,13 @@ namespace phlex::experimental { return glue{graph_, nodes_, (use_bound_object ? bound_obj_ : nullptr), errors_, config_}; } - configuration const* config_; + // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) tbb::flow::graph& graph_; node_catalog& nodes_; - std::shared_ptr bound_obj_; std::vector& errors_; + // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) + std::shared_ptr bound_obj_; + configuration const* config_; }; } diff --git a/phlex/core/input_arguments.hpp b/phlex/core/input_arguments.hpp index 864364a7..f2abf482 100644 --- a/phlex/core/input_arguments.hpp +++ b/phlex/core/input_arguments.hpp @@ -50,9 +50,10 @@ namespace phlex::experimental { template auto form_input_arguments(std::string const& algorithm_name, specified_labels const& args) { - constexpr auto N = std::tuple_size_v; + constexpr auto number_input_types = std::tuple_size_v; detail::verify_no_duplicate_input_products(algorithm_name, args); - return form_input_arguments_impl(args, std::make_index_sequence{}); + return form_input_arguments_impl(args, + std::make_index_sequence{}); } } diff --git a/phlex/core/message.cpp b/phlex/core/message.cpp index a2718c66..f9f9b5cf 100644 --- a/phlex/core/message.cpp +++ b/phlex/core/message.cpp @@ -8,7 +8,7 @@ namespace phlex::experimental { - std::size_t MessageHasher::operator()(message const& msg) const noexcept { return msg.id; } + std::size_t message_hasher::operator()(message const& msg) const noexcept { return msg.id; } message const& more_derived(message const& a, message const& b) { @@ -30,7 +30,7 @@ namespace phlex::experimental { return std::distance(b, it); } - detail::no_join::no_join(tbb::flow::graph& g, MessageHasher) : + detail::no_join::no_join(tbb::flow::graph& g, message_hasher) : no_join_base_t{g, tbb::flow::unlimited, [](message const& msg) { return std::tuple{msg}; }} { } diff --git a/phlex/core/message.hpp b/phlex/core/message.hpp index 6e7d3a3c..539d88f3 100644 --- a/phlex/core/message.hpp +++ b/phlex/core/message.hpp @@ -20,20 +20,25 @@ namespace phlex::experimental { struct message { product_store_const_ptr store; end_of_message_ptr eom; - std::size_t id; + std::size_t id{}; std::size_t original_id{-1ull}; // Used during flush }; template using messages_t = sized_tuple; - struct MessageHasher { + struct message_hasher { std::size_t operator()(message const& msg) const noexcept; }; // Overload for use with most_derived message const& more_derived(message const& a, message const& b); + // Delete overloads that can result in dangling reference + message const& more_derived(message&&, message&&) = delete; + message const& more_derived(message const&, message&&) = delete; + message const& more_derived(message&&, message const&) = delete; + namespace detail { template using join_messages_t = tbb::flow::join_node, tbb::flow::tag_matching>; @@ -41,7 +46,7 @@ namespace phlex::experimental { tbb::flow::function_node, tbb::flow::lightweight>; struct no_join : no_join_base_t { - no_join(tbb::flow::graph& g, MessageHasher); + no_join(tbb::flow::graph& g, message_hasher); }; } @@ -51,7 +56,7 @@ namespace phlex::experimental { template auto make_join_or_none(tbb::flow::graph& g, std::index_sequence) { - return join_or_none_t{g, type_t{}...}; + return join_or_none_t{g, type_t{}...}; } template diff --git a/phlex/core/message_sender.hpp b/phlex/core/message_sender.hpp index 2ef9162f..eaf7dc46 100644 --- a/phlex/core/message_sender.hpp +++ b/phlex/core/message_sender.hpp @@ -23,9 +23,11 @@ namespace phlex::experimental { private: std::size_t original_message_id(product_store_ptr const& store); + // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) level_hierarchy& hierarchy_; multiplexer& multiplexer_; std::stack& eoms_; + // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) std::map original_message_ids_; std::size_t calls_{}; }; diff --git a/phlex/core/multiplexer.hpp b/phlex/core/multiplexer.hpp index 2ba355b0..855ca678 100644 --- a/phlex/core/multiplexer.hpp +++ b/phlex/core/multiplexer.hpp @@ -20,11 +20,11 @@ namespace phlex::experimental { using base = tbb::flow::function_node; public: - ~multiplexer(); + ~multiplexer() final; struct named_input_port { specified_label product_label; - tbb::flow::receiver* port; + tbb::flow::receiver* port{nullptr}; }; using named_input_ports_t = std::vector; using head_ports_t = std::map; @@ -39,7 +39,7 @@ namespace phlex::experimental { private: head_ports_t head_ports_; bool debug_; - std::atomic received_messages_{}; + std::atomic received_messages_; std::chrono::duration execution_time_{}; }; diff --git a/phlex/core/node_catalog.hpp b/phlex/core/node_catalog.hpp index 36123c4e..b17504cf 100644 --- a/phlex/core/node_catalog.hpp +++ b/phlex/core/node_catalog.hpp @@ -26,12 +26,12 @@ namespace phlex::experimental { std::size_t execution_counts(std::string const& node_name) const; std::size_t product_counts(std::string const& node_name) const; - simple_ptr_map predicates{}; - simple_ptr_map observers{}; - simple_ptr_map outputs{}; - simple_ptr_map folds{}; - simple_ptr_map unfolds{}; - simple_ptr_map transforms{}; + simple_ptr_map predicates; + simple_ptr_map observers; + simple_ptr_map outputs; + simple_ptr_map folds; + simple_ptr_map unfolds; + simple_ptr_map transforms; }; } diff --git a/phlex/core/products_consumer.cpp b/phlex/core/products_consumer.cpp index c0ed2b91..6a08d902 100644 --- a/phlex/core/products_consumer.cpp +++ b/phlex/core/products_consumer.cpp @@ -1,4 +1,10 @@ #include "phlex/core/products_consumer.hpp" +#include "phlex/core/specified_label.hpp" +#include "phlex/model/algorithm_name.hpp" + +#include +#include +#include namespace phlex::experimental { diff --git a/phlex/core/registrar.hpp b/phlex/core/registrar.hpp index 92ef9e55..b23de96f 100644 --- a/phlex/core/registrar.hpp +++ b/phlex/core/registrar.hpp @@ -63,11 +63,11 @@ namespace phlex::experimental { template class registrar { - using Nodes = simple_ptr_map; + using nodes_t = simple_ptr_map; using node_creator = std::function, std::vector)>; public: - explicit registrar(Nodes& nodes, std::vector& errors) : + explicit registrar(nodes_t& nodes, std::vector& errors) : nodes_{&nodes}, errors_{&errors} { } @@ -116,11 +116,11 @@ namespace phlex::experimental { } } - Nodes* nodes_; + nodes_t* nodes_; std::vector* errors_; node_creator creator_{}; std::optional> predicates_; - std::vector output_products_{}; + std::vector output_products_; }; } diff --git a/phlex/core/registration_api.hpp b/phlex/core/registration_api.hpp index 86a180f1..f5182fc6 100644 --- a/phlex/core/registration_api.hpp +++ b/phlex/core/registration_api.hpp @@ -24,10 +24,10 @@ namespace phlex::experimental { template