From 9d88f0e9e3559e0aa1d86428b4820c5e0bbdb108 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 14:54:53 +0200 Subject: [PATCH 01/25] * updated interface to SubscriptionIntraProcessBase * implemented interface to deserialize a given serialized message to ros message Signed-off-by: Joshua Hampp --- .../subscription_intra_process.hpp | 29 +++++++++++++++++++ .../subscription_intra_process_base.hpp | 9 ++++++ 2 files changed, 38 insertions(+) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..9c7807d12f 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -28,6 +28,8 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -72,6 +74,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } + if (!allocator) { + message_allocator_ = std::make_shared(); + } else { + message_allocator_ = std::make_shared(*allocator.get()); + } + // Create the intra-process buffer. buffer_ = rclcpp::experimental::create_intra_process_buffer( buffer_type, @@ -102,6 +110,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase #endif } + bool + is_serialized() const + { + return rclcpp::is_serialized_message_class::value; + } + bool is_ready(rcl_wait_set_t * wait_set) { @@ -134,6 +148,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase return buffer_->use_take_shared_method(); } + void + provide_serialized_intra_process_message(const SerializedMessage & serialized_message) + { + rclcpp::Serialization serialization; + + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr); + auto message = MessageUniquePtr(ptr); + + serialization.deserialize_message(serialized_message, *message); + + provide_intra_process_message(std::move(message)); + } + private: void trigger_guard_condition() @@ -168,6 +196,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase AnySubscriptionCallback any_callback_; BufferUniquePtr buffer_; + std::shared_ptr message_allocator_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 7afd68abef..97ac402c83 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -25,6 +25,8 @@ #include "rcl/error_handling.h" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -70,6 +72,13 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + virtual bool + is_serialized() const = 0; + + virtual void + provide_serialized_intra_process_message( + const SerializedMessage & serialized_message) = 0; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; From 1dad3ad9610f95d4b1ef6c304057f90e0835c8f4 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 14:56:30 +0200 Subject: [PATCH 02/25] added support for SerializedMessage in Intraprocessmanger, so: * serialized publisher uses correct method to forward content to subscriber * non-serialized publisher can serialize message for serialized subscriber Signed-off-by: Joshua Hampp --- .../experimental/intra_process_manager.hpp | 97 +++++++++++++++---- 1 file changed, 78 insertions(+), 19 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..d420248f93 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -37,6 +37,8 @@ #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -356,6 +358,8 @@ class IntraProcessManager std::shared_ptr message, std::vector subscription_ids) { + constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class::value; + for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); if (subscription_it == subscriptions_.end()) { @@ -363,11 +367,28 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); + // same message type, so it can just be passed through + if (subscription_base->is_serialized() == is_serialized_publisher) { + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + subscription->provide_intra_process_message(message); + } else if (is_serialized_publisher) { + // publisher provides a serialized message, while subscriber expects a ROS message + provide_serialized_intra_process_message(subscription_base, *message); + } else { + // publisher provides a ROS message, while subscriber expects a serialized message + rclcpp::Serialization serialization; + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + auto serialized_message = std::make_unique(); + serialization.serialize_message(*message, *serialized_message); - subscription->provide_intra_process_message(message); + subscription->provide_intra_process_message(std::move(serialized_message)); + } } } @@ -384,6 +405,8 @@ class IntraProcessManager using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; + constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class::value; + for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); if (subscription_it == subscriptions_.end()) { @@ -391,26 +414,62 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - if (std::next(it) == subscription_ids.end()) { - // If this is the last subscription, give up ownership - subscription->provide_intra_process_message(std::move(message)); + // same message type, so it can just be passed through + if (subscription_base->is_serialized() == is_serialized_publisher) { + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + MessageUniquePtr copy_message; + Deleter deleter = message.get_deleter(); + auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); + MessageAllocTraits::construct(*allocator.get(), ptr, *message); + copy_message = MessageUniquePtr(ptr, deleter); + + subscription->provide_intra_process_message(std::move(copy_message)); + } + } else if (is_serialized_publisher) { + // publisher provides a serialized message, while subscriber expects a ROS message + provide_serialized_intra_process_message(subscription_base, *message); } else { - // Copy the message since we have additional subscriptions to serve - MessageUniquePtr copy_message; - Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); - MessageAllocTraits::construct(*allocator.get(), ptr, *message); - copy_message = MessageUniquePtr(ptr, deleter); - - subscription->provide_intra_process_message(std::move(copy_message)); + // publisher provides a ROS message, while subscriber expects a serialized message + rclcpp::Serialization serialization; + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + auto serialized_message = std::make_unique(); + serialization.serialize_message(*message, *serialized_message); + + subscription->provide_intra_process_message(std::move(serialized_message)); } } } + template + static std::enable_if_t::value> + provide_serialized_intra_process_message( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, + const T & serialized_message) + { + (void)subscription; + (void)serialized_message; + // prevent call if actual message type does not match + } + + static void + provide_serialized_intra_process_message( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, + const SerializedMessage & serialized_message) + { + subscription->provide_serialized_intra_process_message(serialized_message); + } + PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; From c552c0bef29b71ae2d0e784fe6781a21a5f3dd09 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 14:56:58 +0200 Subject: [PATCH 03/25] updated unit test for intraprocess manager to match new interface Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_manager.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index b71fb1d061..c6a981f964 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -23,6 +23,8 @@ #define RCLCPP_BUILDING_LIBRARY 1 #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/qos.hpp" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -212,6 +214,18 @@ class SubscriptionIntraProcessBase return topic_name; } + bool + is_serialized() const + { + return false; + } + + void + provide_serialized_intra_process_message(const rclcpp::SerializedMessage & serialized_message) + { + (void)serialized_message; + } + rmw_qos_profile_t qos_profile; const char * topic_name; }; From 2513cc2e1599f03c7f383f295105bd2a023ccab2 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 05:54:32 +0200 Subject: [PATCH 04/25] updated is_serialized_message_class traits Signed-off-by: Joshua Hampp --- .../include/rclcpp/experimental/intra_process_manager.hpp | 8 +++++--- .../rclcpp/experimental/subscription_intra_process.hpp | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d420248f93..072a53078a 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -358,7 +358,8 @@ class IntraProcessManager std::shared_ptr message, std::vector subscription_ids) { - constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class::value; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); @@ -405,7 +406,8 @@ class IntraProcessManager using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; - constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class::value; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); @@ -452,7 +454,7 @@ class IntraProcessManager } template - static std::enable_if_t::value> + static std::enable_if_t::value> provide_serialized_intra_process_message( rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, const T & serialized_message) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 9c7807d12f..2b6573287a 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -113,7 +113,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase bool is_serialized() const { - return rclcpp::is_serialized_message_class::value; + return serialization_traits::is_serialized_message_class::value; } bool From fd6a20ec8e2bcc2afed7f84d1a38be8bdcf7f33c Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 05:58:39 +0200 Subject: [PATCH 05/25] renamed provide_serialized_intra_process_message to provide_intra_process_message Signed-off-by: Joshua Hampp --- .../rclcpp/experimental/intra_process_manager.hpp | 10 +++++----- .../rclcpp/experimental/subscription_intra_process.hpp | 2 +- .../experimental/subscription_intra_process_base.hpp | 2 +- rclcpp/test/test_intra_process_manager.cpp | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 072a53078a..757341d421 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -377,7 +377,7 @@ class IntraProcessManager subscription->provide_intra_process_message(message); } else if (is_serialized_publisher) { // publisher provides a serialized message, while subscriber expects a ROS message - provide_serialized_intra_process_message(subscription_base, *message); + provide_intra_process_message(subscription_base, *message); } else { // publisher provides a ROS message, while subscriber expects a serialized message rclcpp::Serialization serialization; @@ -437,7 +437,7 @@ class IntraProcessManager } } else if (is_serialized_publisher) { // publisher provides a serialized message, while subscriber expects a ROS message - provide_serialized_intra_process_message(subscription_base, *message); + provide_intra_process_message(subscription_base, *message); } else { // publisher provides a ROS message, while subscriber expects a serialized message rclcpp::Serialization serialization; @@ -455,7 +455,7 @@ class IntraProcessManager template static std::enable_if_t::value> - provide_serialized_intra_process_message( + provide_intra_process_message( rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, const T & serialized_message) { @@ -465,11 +465,11 @@ class IntraProcessManager } static void - provide_serialized_intra_process_message( + provide_intra_process_message( rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, const SerializedMessage & serialized_message) { - subscription->provide_serialized_intra_process_message(serialized_message); + subscription->provide_intra_process_message(serialized_message); } PublisherToSubscriptionIdsMap pub_to_subs_; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 2b6573287a..059ff5e1c8 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -149,7 +149,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_serialized_intra_process_message(const SerializedMessage & serialized_message) + provide_intra_process_message(const SerializedMessage & serialized_message) { rclcpp::Serialization serialization; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 97ac402c83..84af55fa0c 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -76,7 +76,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable is_serialized() const = 0; virtual void - provide_serialized_intra_process_message( + provide_intra_process_message( const SerializedMessage & serialized_message) = 0; protected: diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index c6a981f964..f7419e08f3 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -221,7 +221,7 @@ class SubscriptionIntraProcessBase } void - provide_serialized_intra_process_message(const rclcpp::SerializedMessage & serialized_message) + provide_intra_process_message(const rclcpp::SerializedMessage & serialized_message) { (void)serialized_message; } From f0c8dd31c3b1782e2a17d229553bac178c26a489 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 06:06:35 +0200 Subject: [PATCH 06/25] fixed Serialization class for non-supported type (yes this is actually needed for the next PR) Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialization.hpp | 28 +++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index 53fe59dc8b..9666a81a5e 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -97,6 +97,34 @@ class Serialization : public SerializationBase } }; +/// Specialized serialization for rcl_serialized_message_t (because type support is not defined) +template<> +class Serialization: public SerializationBase +{ +public: + /// Constructor of Serialization + Serialization() + : SerializationBase(nullptr) + { + throw std::runtime_error( + "Serialization of rcl_serialized_message_t to serialized message is not possible."); + } +}; + +/// Specialized serialization for SerializedMessage (because type support is not defined) +template<> +class Serialization: public SerializationBase +{ +public: + /// Constructor of Serialization + Serialization() + : SerializationBase(nullptr) + { + throw std::runtime_error( + "Serialization of SerializedMessage to serialized message is not possible."); + } +}; + } // namespace rclcpp #endif // RCLCPP__SERIALIZATION_HPP_ From facb43d7df4069a4ae8cab951f408e4d3bd1f681 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 06:06:59 +0200 Subject: [PATCH 07/25] updated to (de)seriaizle method Signed-off-by: Joshua Hampp --- .../include/rclcpp/experimental/intra_process_manager.hpp | 6 ++++-- .../rclcpp/experimental/subscription_intra_process.hpp | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 757341d421..a2d765ce64 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -386,7 +386,8 @@ class IntraProcessManager >(subscription_base); auto serialized_message = std::make_unique(); - serialization.serialize_message(*message, *serialized_message); + serialization.serialize_message( + reinterpret_cast(message.get()), serialized_message.get()); subscription->provide_intra_process_message(std::move(serialized_message)); } @@ -446,7 +447,8 @@ class IntraProcessManager >(subscription_base); auto serialized_message = std::make_unique(); - serialization.serialize_message(*message, *serialized_message); + serialization.serialize_message( + reinterpret_cast(message.get()), serialized_message.get()); subscription->provide_intra_process_message(std::move(serialized_message)); } diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 059ff5e1c8..e7af4cef54 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -157,7 +157,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase MessageAllocTraits::construct(*message_allocator_.get(), ptr); auto message = MessageUniquePtr(ptr); - serialization.deserialize_message(serialized_message, *message); + serialization.deserialize_message(&serialized_message, reinterpret_cast(message.get())); provide_intra_process_message(std::move(message)); } From 08a3251e83627f56e8711696d9a4dd5694cb5121 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Tue, 21 Apr 2020 08:57:31 -0700 Subject: [PATCH 08/25] fix build regression (#1078) Signed-off-by: Dirk Thomas --- rclcpp/include/rclcpp/node_interfaces/node_base.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index c21514ff97..9816325bff 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -43,7 +43,7 @@ class NodeBase : public NodeBaseInterface rclcpp::Context::SharedPtr context, const rcl_node_options_t & rcl_node_options, bool use_intra_process_default, - bool enable_topic_statistics_default); + bool enable_topic_statistics_default=false); RCLCPP_PUBLIC virtual From 0577f390242de78eeec1ae75fda9be7193995e47 Mon Sep 17 00:00:00 2001 From: Prajakta Gokhale Date: Tue, 21 Apr 2020 14:14:47 -0700 Subject: [PATCH 09/25] Reflect changes in rclcpp API (#1079) * Reflect changes in rclcpp API Signed-off-by: Prajakta Gokhale * Revert earlier fix made in rclcpp Signed-off-by: Prajakta Gokhale --- rclcpp/include/rclcpp/node_interfaces/node_base.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index 9816325bff..c21514ff97 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -43,7 +43,7 @@ class NodeBase : public NodeBaseInterface rclcpp::Context::SharedPtr context, const rcl_node_options_t & rcl_node_options, bool use_intra_process_default, - bool enable_topic_statistics_default=false); + bool enable_topic_statistics_default); RCLCPP_PUBLIC virtual From 9ae2da19538521bcf9e0b90034ceda23f62b4a48 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 15:03:02 +0200 Subject: [PATCH 10/25] extended publisher: * added constructor with typesupport * added more generic do_inter_process_publish Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/generic_publisher.hpp | 67 +++++++++++++++++++++ rclcpp/include/rclcpp/publisher.hpp | 37 ++++++------ 2 files changed, 86 insertions(+), 18 deletions(-) create mode 100644 rclcpp/include/rclcpp/generic_publisher.hpp diff --git a/rclcpp/include/rclcpp/generic_publisher.hpp b/rclcpp/include/rclcpp/generic_publisher.hpp new file mode 100644 index 0000000000..4ec713be96 --- /dev/null +++ b/rclcpp/include/rclcpp/generic_publisher.hpp @@ -0,0 +1,67 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__GENERIC_PUBLISHER_HPP_ +#define RCLCPP__GENERIC_PUBLISHER_HPP_ + +#include "rcl/error_handling.h" +#include "rcl/publisher.h" + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/serialized_message.hpp" + +namespace rclcpp +{ + +namespace generic_publisher +{ + +template +inline void +do_inter_process_publish(rcl_publisher_t & publisher_handle, const MessageT & msg) +{ + auto status = rcl_publish(&publisher_handle, &msg, nullptr); + + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); // next call will reset error message if not context + if (rcl_publisher_is_valid_except_context(&publisher_handle)) { + rcl_context_t * context = rcl_publisher_get_context(&publisher_handle); + if (nullptr != context && !rcl_context_is_valid(context)) { + // publisher is invalid due to context being shutdown + return; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message"); + } +} + +template<> +inline void +do_inter_process_publish( + rcl_publisher_t & publisher_handle, + const SerializedMessage & serialized_msg) +{ + auto status = rcl_publish_serialized_message(&publisher_handle, &serialized_msg, nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } +} + +} // namespace generic_publisher + +} // namespace rclcpp + +#endif // RCLCPP__GENERIC_PUBLISHER_HPP_ diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aa614e3a0c..2f717b3b45 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -32,6 +32,7 @@ #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" +#include "rclcpp/generic_publisher.hpp" #include "rclcpp/loaned_message.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" @@ -63,11 +64,12 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) : PublisherBase( node_base, topic, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + type_support, options.template to_rcl_publisher_options(qos)), options_(options), message_allocator_(new MessageAllocator(*options.get_allocator().get())) @@ -103,6 +105,19 @@ class Publisher : public PublisherBase // Setup continues in the post construction method, post_init_setup(). } + Publisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + : Publisher( + node_base, + topic, + qos, + options, + *rosidl_typesupport_cpp::get_message_type_support_handle()) + {} + /// Called post construction, so that construction may continue after shared_from_this() works. virtual void @@ -259,24 +274,10 @@ class Publisher : public PublisherBase } protected: - void + virtual void do_inter_process_publish(const MessageT & msg) { - auto status = rcl_publish(&publisher_handle_, &msg, nullptr); - - if (RCL_RET_PUBLISHER_INVALID == status) { - rcl_reset_error(); // next call will reset error message if not context - if (rcl_publisher_is_valid_except_context(&publisher_handle_)) { - rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_); - if (nullptr != context && !rcl_context_is_valid(context)) { - // publisher is invalid due to context being shutdown - return; - } - } - } - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message"); - } + generic_publisher::do_inter_process_publish(publisher_handle_, msg); } void From 9490d9f30112c75c608b3281d7232292c7bed2d8 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 15:03:47 +0200 Subject: [PATCH 11/25] added create_generic_subscription and create_generic_publisher Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_publisher.hpp | 40 +++++++++++++ rclcpp/include/rclcpp/create_subscription.hpp | 47 ++++++++++++++++ rclcpp/include/rclcpp/publisher_factory.hpp | 29 ++++++++++ .../include/rclcpp/subscription_factory.hpp | 56 +++++++++++++++++++ 4 files changed, 172 insertions(+) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 811c18b69f..433077cd7d 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -29,6 +29,46 @@ namespace rclcpp { +/// Create and return a publisher of the given MessageT type. +/** + * The NodeT type only needs to have a method called get_node_topics_interface() + * which returns a shared_ptr to a NodeTopicsInterface. + */ +template< + typename MessageT, + typename AllocatorT = std::allocator, + typename PublisherT = rclcpp::Publisher, + typename NodeT> +std::shared_ptr +create_generic_publisher( + NodeT & node, + const std::string & topic_name, + const rosidl_message_type_support_t & type_support, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options = ( + rclcpp::PublisherOptionsWithAllocator() + ) +) +{ + // Extract the NodeTopicsInterface from the NodeT. + using rclcpp::node_interfaces::get_node_topics_interface; + auto node_topics = get_node_topics_interface(node); + + // Create the publisher. + auto pub = node_topics->create_publisher( + topic_name, + rclcpp::create_generic_publisher_factory( + options, + type_support), + qos + ); + + // Add the publisher to the node topics interface. + node_topics->add_publisher(pub, options.callback_group); + + return std::dynamic_pointer_cast(pub); +} + /// Create and return a publisher of the given MessageT type. /** * The NodeT type only needs to have a method called get_node_topics_interface() diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 9248254047..d2cdd2bc51 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -29,6 +29,53 @@ namespace rclcpp { +/// Create and return a subscription of the given MessageT type. +/** + * The NodeT type only needs to have a method called get_node_topics_interface() + * which returns a shared_ptr to a NodeTopicsInterface, or be a + * NodeTopicsInterface pointer itself. + */ +template< + typename MessageT, + typename CallbackT, + typename AllocatorT = std::allocator, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >, + typename NodeT> +typename std::shared_ptr +create_generic_subscription( + NodeT && node, + const std::string & topic_name, + const rosidl_message_type_support_t & type_support, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = + rclcpp::SubscriptionOptionsWithAllocator(), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = + MessageMemoryStrategyT::create_default() +) +{ + using rclcpp::node_interfaces::get_node_topics_interface; + auto node_topics = get_node_topics_interface(std::forward(node)); + + auto factory = rclcpp::create_generic_subscription_factory( + std::forward(callback), + options, + msg_mem_strat, + type_support + ); + + auto sub = node_topics->create_subscription(topic_name, factory, qos); + node_topics->add_subscription(sub, options.callback_group); + + return std::dynamic_pointer_cast(sub); +} + /// Create and return a subscription of the given MessageT type. /** * The NodeT type only needs to have a method called get_node_topics_interface() diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 87def3cc17..ac4f729ed2 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -60,6 +60,35 @@ struct PublisherFactory const PublisherFactoryFunction create_typed_publisher; }; +/// Return a PublisherFactory with functions setup for creating a PublisherT. +template +PublisherFactory +create_generic_publisher_factory( + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) +{ + PublisherFactory factory { + // factory function that creates a MessageT specific PublisherT + [options, type_support]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> std::shared_ptr + { + auto publisher = std::make_shared( + node_base, topic_name, qos, options, type_support); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + publisher->post_init_setup(node_base, topic_name, qos, options); + return publisher; + } + }; + + // return the factory now that it is populated + return factory; +} + /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a0f265c803..6f3498045e 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -117,6 +117,62 @@ create_subscription_factory( // return the factory now that it is populated return factory; } +/// Return a SubscriptionFactory setup to create a SubscriptionT. +template< + typename MessageT, + typename CallbackT, + typename AllocatorT, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> +SubscriptionFactory +create_generic_subscription_factory( + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options, + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + const rosidl_message_type_support_t & type_support) +{ + auto allocator = options.get_allocator(); + + using rclcpp::AnySubscriptionCallback; + AnySubscriptionCallback any_subscription_callback(allocator); + any_subscription_callback.set(std::forward(callback)); + + SubscriptionFactory factory { + // factory function that creates a MessageT specific SubscriptionT + [options, msg_mem_strat, any_subscription_callback, type_support]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> rclcpp::SubscriptionBase::SharedPtr + { + using rclcpp::Subscription; + using rclcpp::SubscriptionBase; + + auto sub = Subscription::make_shared( + node_base, + type_support, + topic_name, + qos, + any_subscription_callback, + options, + msg_mem_strat); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + sub->post_init_setup(node_base, qos, options); + auto sub_base_ptr = std::dynamic_pointer_cast(sub); + return sub_base_ptr; + } + }; + + // return the factory now that it is populated + return factory; +} } // namespace rclcpp From 3b430bf81a11ad183b53863a1b80e3eb4832519d Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 15:04:14 +0200 Subject: [PATCH 12/25] added test for intra process communication with generic publishers and subscribers Signed-off-by: Joshua Hampp --- rclcpp/CMakeLists.txt | 12 + .../test/test_intra_process_communication.cpp | 367 ++++++++++++++++++ 2 files changed, 379 insertions(+) create mode 100644 rclcpp/test/test_intra_process_communication.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3918120e5a..c3dc4fed3f 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -253,6 +253,18 @@ if(BUILD_TESTING) "test_msgs" ) target_link_libraries(test_loaned_message ${PROJECT_NAME}) + ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp + TIMEOUT 120) + if(TARGET test_intra_process_communication) + ament_target_dependencies(test_intra_process_communication + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + target_link_libraries(test_intra_process_communication ${PROJECT_NAME}) ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240) if(TARGET test_node) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp new file mode 100644 index 0000000000..2e504d2456 --- /dev/null +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -0,0 +1,367 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rclcpp/serialized_message.hpp" + +int32_t & get_test_allocation_counter() +{ + static int32_t counter = 0; + return counter; +} + +void * custom_allocate(size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + auto r = m_allocator.allocate(size, state); + return r; +} + +void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + + auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state); + return r; +} + +void * custom_reallocate(void * pointer, size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + if (pointer == nullptr) { + ++get_test_allocation_counter(); + } + + auto r = m_allocator.reallocate(pointer, size, state); + return r; +} + +void custom_deallocate(void * pointer, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + --get_test_allocation_counter(); + m_allocator.deallocate(pointer, state); +} + +rcl_serialized_message_t make_serialized_string_msg( + const std::shared_ptr & stringMsg) +{ + auto m_allocator = rcutils_get_default_allocator(); + + // add custom (de)allocator to count the references to the object + m_allocator.allocate = &custom_allocate; + m_allocator.deallocate = &custom_deallocate; + m_allocator.reallocate = &custom_reallocate; + m_allocator.zero_allocate = &custom_zero_allocate; + + rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + static auto type = + rosidl_typesupport_cpp::get_message_type_support_handle + (); + auto error = rmw_serialize(stringMsg.get(), type, &msg); + if (error != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "test_intra_process_communication", + "Something went wrong preparing the serialized message"); + } + + return msg; +} + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expected intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + size_t runs; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + +protected: + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( + 250); +std::array counts; + +void OnMessageSerialized(const std::shared_ptr msg) +{ + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageConst(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessageUniquePtr(std::unique_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessage(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic", 10); + + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + std::unique_ptr stringMsgU( + new test_msgs::msg::Strings( + *stringMsg)); + + publisher->publish(*stringMsg); + publisher->publish(std::move(stringMsgU)); + } + for (uint32_t i = 0; i < 3; ++i) { + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + } + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 2u); + EXPECT_EQ(counts[1], 6u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + // create a generic publisher + auto publisher = rclcpp::create_generic_publisher( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10)); + + // create a generic subscriber + auto sub_gen_serialized = rclcpp::create_generic_subscription( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10), + &OnMessageSerialized); + + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + auto msg0 = make_serialized_string_msg(stringMsg); + + publisher->publish(std::make_unique(std::move(msg0))); + } + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 2u); + EXPECT_EQ(counts[1], 3u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +std::vector parameters = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + 1, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + 1, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + 1, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +std::vector setRuns(const std::vector & in, const size_t runs) +{ + std::vector out = in; + for (auto & p : out) { + p.runs = runs; + } + return out; +} + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(setRuns(parameters, 1000)), + ::testing::PrintToStringParamName()); From 88a69272088b9dcbdcf7fa93dea2684ce831eec4 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 15:57:15 +0200 Subject: [PATCH 13/25] fixed date Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/generic_publisher.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/generic_publisher.hpp b/rclcpp/include/rclcpp/generic_publisher.hpp index 4ec713be96..5663852e0b 100644 --- a/rclcpp/include/rclcpp/generic_publisher.hpp +++ b/rclcpp/include/rclcpp/generic_publisher.hpp @@ -1,4 +1,4 @@ -// Copyright 2014 Open Source Robotics Foundation, Inc. +// Copyright 2020 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 63ad2fe19c4ec6dd1fa6ae1c66b97611df3d7022 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 06:29:13 +0200 Subject: [PATCH 14/25] fixed unintended change Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 2f717b3b45..fcccf0d798 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -274,7 +274,7 @@ class Publisher : public PublisherBase } protected: - virtual void + void do_inter_process_publish(const MessageT & msg) { generic_publisher::do_inter_process_publish(publisher_handle_, msg); From 86f8ef9274d908777fbfbe268d5fb87b393ac58d Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 07:53:33 +0200 Subject: [PATCH 15/25] replaced generic_publisher Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/generic_publisher.hpp | 67 --------------------- rclcpp/include/rclcpp/publisher.hpp | 30 +++++++-- 2 files changed, 26 insertions(+), 71 deletions(-) delete mode 100644 rclcpp/include/rclcpp/generic_publisher.hpp diff --git a/rclcpp/include/rclcpp/generic_publisher.hpp b/rclcpp/include/rclcpp/generic_publisher.hpp deleted file mode 100644 index 5663852e0b..0000000000 --- a/rclcpp/include/rclcpp/generic_publisher.hpp +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__GENERIC_PUBLISHER_HPP_ -#define RCLCPP__GENERIC_PUBLISHER_HPP_ - -#include "rcl/error_handling.h" -#include "rcl/publisher.h" - -#include "rclcpp/exceptions.hpp" -#include "rclcpp/serialized_message.hpp" - -namespace rclcpp -{ - -namespace generic_publisher -{ - -template -inline void -do_inter_process_publish(rcl_publisher_t & publisher_handle, const MessageT & msg) -{ - auto status = rcl_publish(&publisher_handle, &msg, nullptr); - - if (RCL_RET_PUBLISHER_INVALID == status) { - rcl_reset_error(); // next call will reset error message if not context - if (rcl_publisher_is_valid_except_context(&publisher_handle)) { - rcl_context_t * context = rcl_publisher_get_context(&publisher_handle); - if (nullptr != context && !rcl_context_is_valid(context)) { - // publisher is invalid due to context being shutdown - return; - } - } - } - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message"); - } -} - -template<> -inline void -do_inter_process_publish( - rcl_publisher_t & publisher_handle, - const SerializedMessage & serialized_msg) -{ - auto status = rcl_publish_serialized_message(&publisher_handle, &serialized_msg, nullptr); - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); - } -} - -} // namespace generic_publisher - -} // namespace rclcpp - -#endif // RCLCPP__GENERIC_PUBLISHER_HPP_ diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index fcccf0d798..d1a17c5635 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -32,7 +32,6 @@ #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" -#include "rclcpp/generic_publisher.hpp" #include "rclcpp/loaned_message.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" @@ -274,10 +273,33 @@ class Publisher : public PublisherBase } protected: - void - do_inter_process_publish(const MessageT & msg) + template + typename std::enable_if::value, void>::type + do_inter_process_publish(const T & msg) + { + auto status = rcl_publish(&publisher_handle_, &msg, nullptr); + + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); // next call will reset error message if not context + if (rcl_publisher_is_valid_except_context(&publisher_handle_)) { + rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_); + if (nullptr != context && !rcl_context_is_valid(context)) { + // publisher is invalid due to context being shutdown + return; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message"); + } + } + + template + typename std::enable_if::value, void>::type + do_inter_process_publish(const T & msg) { - generic_publisher::do_inter_process_publish(publisher_handle_, msg); + // for serialized messages the serialized method is needed + do_serialized_publish(&msg); } void From 62635ab42dacfaccf3673ee4b5ded9ff2fb32705 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 07:54:23 +0200 Subject: [PATCH 16/25] removed unneeded template typename Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_publisher.hpp | 4 ++-- rclcpp/include/rclcpp/create_subscription.hpp | 3 +-- rclcpp/include/rclcpp/publisher_factory.hpp | 4 ++-- rclcpp/include/rclcpp/subscription_factory.hpp | 1 - 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 433077cd7d..76c6f8193c 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -35,7 +35,7 @@ namespace rclcpp * which returns a shared_ptr to a NodeTopicsInterface. */ template< - typename MessageT, + typename MessageT = SerializedMessage, typename AllocatorT = std::allocator, typename PublisherT = rclcpp::Publisher, typename NodeT> @@ -57,7 +57,7 @@ create_generic_publisher( // Create the publisher. auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_generic_publisher_factory( + rclcpp::create_publisher_factory( options, type_support), qos diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index d2cdd2bc51..c9df202f27 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -36,7 +36,6 @@ namespace rclcpp * NodeTopicsInterface pointer itself. */ template< - typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, typename CallbackMessageT = @@ -63,7 +62,7 @@ create_generic_subscription( using rclcpp::node_interfaces::get_node_topics_interface; auto node_topics = get_node_topics_interface(std::forward(node)); - auto factory = rclcpp::create_generic_subscription_factory( + auto factory = rclcpp::create_subscription_factory( std::forward(callback), options, msg_mem_strat, diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index ac4f729ed2..fb1723cb5d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -61,14 +61,14 @@ struct PublisherFactory }; /// Return a PublisherFactory with functions setup for creating a PublisherT. -template +template PublisherFactory create_generic_publisher_factory( const rclcpp::PublisherOptionsWithAllocator & options, const rosidl_message_type_support_t & type_support) { PublisherFactory factory { - // factory function that creates a MessageT specific PublisherT + // factory function that creates a specific PublisherT [options, type_support]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index 6f3498045e..0586b8d078 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -119,7 +119,6 @@ create_subscription_factory( } /// Return a SubscriptionFactory setup to create a SubscriptionT. template< - typename MessageT, typename CallbackT, typename AllocatorT, typename CallbackMessageT = From d676d126d58fe36305f07c7db4397ecae86c922d Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 07:54:46 +0200 Subject: [PATCH 17/25] renamed create_generic_... to create_ Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_publisher.hpp | 2 +- rclcpp/include/rclcpp/create_subscription.hpp | 2 +- rclcpp/include/rclcpp/publisher_factory.hpp | 2 +- rclcpp/include/rclcpp/subscription_factory.hpp | 2 +- rclcpp/test/test_intra_process_communication.cpp | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 76c6f8193c..9e72e2a5cb 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -40,7 +40,7 @@ template< typename PublisherT = rclcpp::Publisher, typename NodeT> std::shared_ptr -create_generic_publisher( +create_publisher( NodeT & node, const std::string & topic_name, const rosidl_message_type_support_t & type_support, diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index c9df202f27..98f5ecd071 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -47,7 +47,7 @@ template< >, typename NodeT> typename std::shared_ptr -create_generic_subscription( +create_subscription( NodeT && node, const std::string & topic_name, const rosidl_message_type_support_t & type_support, diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index fb1723cb5d..f9ca967e4d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -63,7 +63,7 @@ struct PublisherFactory /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory -create_generic_publisher_factory( +create_publisher_factory( const rclcpp::PublisherOptionsWithAllocator & options, const rosidl_message_type_support_t & type_support) { diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index 0586b8d078..a8663286b1 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -129,7 +129,7 @@ template< AllocatorT >> SubscriptionFactory -create_generic_subscription_factory( +create_subscription_factory( CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 2e504d2456..13bacdcb44 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -234,14 +234,14 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) "/ns", parameters.node_options[0]); // create a generic publisher - auto publisher = rclcpp::create_generic_publisher( + auto publisher = rclcpp::create_publisher( node, "/topic", *rosidl_typesupport_cpp::get_message_type_support_handle(), rclcpp::QoS(10)); // create a generic subscriber - auto sub_gen_serialized = rclcpp::create_generic_subscription( + auto sub_gen_serialized = rclcpp::create_subscription( node, "/topic", *rosidl_typesupport_cpp::get_message_type_support_handle(), From aeb69bf3ad45ddee9ba84078643832c8445f62ed Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 22 Apr 2020 08:39:01 +0200 Subject: [PATCH 18/25] updated to new SerializedMessage Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 2 +- rclcpp/test/test_intra_process_communication.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index d1a17c5635..4b5cdd121d 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -299,7 +299,7 @@ class Publisher : public PublisherBase do_inter_process_publish(const T & msg) { // for serialized messages the serialized method is needed - do_serialized_publish(&msg); + do_serialized_publish(&msg.get_rcl_serialized_message()); } void diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 13bacdcb44..f90fa805a5 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -142,8 +142,8 @@ std::array counts; void OnMessageSerialized(const std::shared_ptr msg) { - EXPECT_NE(msg->buffer, nullptr); - EXPECT_GT(msg->buffer_capacity, 0u); + EXPECT_NE(msg->get_rcl_serialized_message().buffer, nullptr); + EXPECT_GT(msg->get_rcl_serialized_message().buffer_capacity, 0u); ++counts[0]; } From 8d2de84f4e7a857674a8165fe4a26bbe8bd57a7c Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 09:25:59 +0200 Subject: [PATCH 19/25] * added get_type_support_handle to rclcpp::Serialization * removed specialization to rclcpp::Serialization Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialization.hpp | 69 ++++++++++++++----------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index 9666a81a5e..bc9063a9b8 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -48,6 +48,32 @@ struct is_serialized_message_class: std::true_type {}; } // namespace serialization_traits +namespace serialization +{ +template +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return rosidl_typesupport_cpp::get_message_type_support_handle(); +} + +// no message type support for rclcpp::SerializedMessage +template<> +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return nullptr; +} + +// no message type support for rcl_serialized_message_t +template<> +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return nullptr; +} +} // namespace serialization + /// Interface to (de)serialize a message class RCLCPP_PUBLIC_TYPE SerializationBase { @@ -78,6 +104,13 @@ class RCLCPP_PUBLIC_TYPE SerializationBase void deserialize_message( const SerializedMessage * serialized_message, void * ros_message) const; + /// Get the message type support for the serialized message + /** + * \return The message type support. + */ + virtual const rosidl_message_type_support_t * + get_type_support_handle() const = 0; + private: const rosidl_message_type_support_t * type_support_; }; @@ -89,39 +122,13 @@ class Serialization : public SerializationBase public: /// Constructor of Serialization Serialization() - : SerializationBase(rosidl_typesupport_cpp::get_message_type_support_handle()) - { - static_assert( - !serialization_traits::is_serialized_message_class::value, - "Serialization of serialized message to serialized message is not possible."); - } -}; - -/// Specialized serialization for rcl_serialized_message_t (because type support is not defined) -template<> -class Serialization: public SerializationBase -{ -public: - /// Constructor of Serialization - Serialization() - : SerializationBase(nullptr) - { - throw std::runtime_error( - "Serialization of rcl_serialized_message_t to serialized message is not possible."); - } -}; + : SerializationBase(get_type_support_handle()) + {} -/// Specialized serialization for SerializedMessage (because type support is not defined) -template<> -class Serialization: public SerializationBase -{ -public: - /// Constructor of Serialization - Serialization() - : SerializationBase(nullptr) + const rosidl_message_type_support_t * + get_type_support_handle() const override { - throw std::runtime_error( - "Serialization of SerializedMessage to serialized message is not possible."); + return serialization::get_type_support_handle_impl(); } }; From e2c19bb98e919a671fc40746b08954a243fcf0d8 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 09:29:37 +0200 Subject: [PATCH 20/25] extended subscription_intra_process(_base): * allocate message * getter for serialization * provide untyped message Signed-off-by: Joshua Hampp --- .../subscription_intra_process.hpp | 34 ++++++++++++------- .../subscription_intra_process_base.hpp | 13 +++++-- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index e7af4cef54..4ce8e48da5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -129,10 +129,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_intra_process_message(ConstMessageSharedPtr message) + provide_intra_process_message(std::shared_ptr message) { - buffer_->add_shared(std::move(message)); - trigger_guard_condition(); + provide_intra_process_message_impl(std::static_pointer_cast(message)); } void @@ -148,18 +147,22 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase return buffer_->use_take_shared_method(); } - void - provide_intra_process_message(const SerializedMessage & serialized_message) + std::shared_ptr + create_shared_message(const void * message_to_copy) { - rclcpp::Serialization serialization; - - auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); - MessageAllocTraits::construct(*message_allocator_.get(), ptr); - auto message = MessageUniquePtr(ptr); + if (nullptr != message_to_copy) { + return std::allocate_shared( + *message_allocator_, + *reinterpret_cast(message_to_copy)); + } - serialization.deserialize_message(&serialized_message, reinterpret_cast(message.get())); + return std::allocate_shared(*message_allocator_); + } - provide_intra_process_message(std::move(message)); + std::unique_ptr + get_serialization() + { + return std::make_unique>(); } private: @@ -170,6 +173,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase (void)ret; } + void + provide_intra_process_message_impl(ConstMessageSharedPtr message) + { + buffer_->add_shared(std::move(message)); + trigger_guard_condition(); + } + template typename std::enable_if::value, void>::type execute_impl() diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 84af55fa0c..a7fae5bac2 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -72,12 +72,21 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + RCLCPP_PUBLIC virtual bool is_serialized() const = 0; + RCLCPP_PUBLIC virtual void - provide_intra_process_message( - const SerializedMessage & serialized_message) = 0; + provide_intra_process_message(std::shared_ptr message) = 0; + + RCLCPP_PUBLIC + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) = 0; + + RCLCPP_PUBLIC + virtual std::unique_ptr + get_serialization() = 0; protected: std::recursive_mutex reentrant_mutex_; From 370061be733e7c9fba92e9a052cfb3395ed0af26 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 09:32:19 +0200 Subject: [PATCH 21/25] update unit test for subscription_intra_process(_base) Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_manager.cpp | 32 +++++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index f7419e08f3..369df01b7f 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -220,11 +220,14 @@ class SubscriptionIntraProcessBase return false; } - void - provide_intra_process_message(const rclcpp::SerializedMessage & serialized_message) - { - (void)serialized_message; - } + virtual void + provide_intra_process_message(std::shared_ptr message) = 0; + + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) = 0; + + virtual std::unique_ptr + get_serialization() = 0; rmw_qos_profile_t qos_profile; const char * topic_name; @@ -243,9 +246,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_intra_process_message(std::shared_ptr msg) + provide_intra_process_message(std::shared_ptr message) { - buffer->add(msg); + buffer->add(std::static_pointer_cast(message)); } void @@ -254,6 +257,21 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase buffer->add(std::move(msg)); } + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) + { + if (nullptr != message_to_copy) { + return std::make_shared(*reinterpret_cast(message_to_copy)); + } + return std::make_shared(); + } + + virtual std::unique_ptr + get_serialization() + { + return std::make_unique>(); + } + std::uintptr_t pop() { From 50334ba6bf3ebc6a4daafce5e13ccd1dd833d094 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 09:35:37 +0200 Subject: [PATCH 22/25] extended intra_process_manager for serialized messages: * two extra subscription/publisher pairs (for serialized ones) * added methods for (de)serialization * added methods for messages without known allocator * splitted up publishing with same type and not matching message type Signed-off-by: Joshua Hampp --- .../experimental/intra_process_manager.hpp | 453 ++++++++++++------ rclcpp/src/rclcpp/intra_process_manager.cpp | 47 +- 2 files changed, 338 insertions(+), 162 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index a2d765ce64..d80acc2cf5 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -41,6 +41,8 @@ #include "rclcpp/serialized_message.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/asserts.hpp" + namespace rclcpp { @@ -185,8 +187,21 @@ class IntraProcessManager std::unique_ptr message, std::shared_ptr::allocator_type> allocator) { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + + constexpr auto index_ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized_publisher); + constexpr auto index_shared_same = SplittedSubscriptions::get_index( + true, + is_serialized_publisher); + constexpr auto index_ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized_publisher); + constexpr auto index_shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized_publisher); std::shared_lock lock(mutex_); @@ -200,40 +215,22 @@ class IntraProcessManager } const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // None of the buffers require ownership, so we promote the pointer - std::shared_ptr msg = std::move(message); - - this->template add_shared_msg_to_buffers(msg, sub_ids.take_shared_subscriptions); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() <= 1) - { - // There is at maximum 1 buffer that does not require ownership. - // So we this case is equivalent to all the buffers requiring ownership - - // Merge the two vector of ids into a unique one - std::vector concatenated_vector(sub_ids.take_shared_subscriptions); - concatenated_vector.insert( - concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); - - this->template add_owned_msg_to_buffers( - std::move(message), - concatenated_vector, - allocator); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() > 1) + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[index_ownership_other].size() + + sub_ids.take_subscriptions[index_shared_other].size() > 0) { - // Construct a new shared pointer from the message - // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared(*allocator, *message); - - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( - std::move(message), sub_ids.take_ownership_subscriptions, allocator); + do_intra_process_publish_other_type( + message.get(), + sub_ids.take_subscriptions[index_ownership_other], + sub_ids.take_subscriptions[index_shared_other] + ); } + + do_intra_process_publish_same_type( + std::move(message), allocator, + sub_ids.take_subscriptions[index_ownership_same], + sub_ids.take_subscriptions[index_shared_same] + ); } template< @@ -246,8 +243,21 @@ class IntraProcessManager std::unique_ptr message, std::shared_ptr::allocator_type> allocator) { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + + constexpr auto index_ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized_publisher); + constexpr auto index_shared_same = SplittedSubscriptions::get_index( + true, + is_serialized_publisher); + constexpr auto index_ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized_publisher); + constexpr auto index_shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized_publisher); std::shared_lock lock(mutex_); @@ -261,33 +271,22 @@ class IntraProcessManager } const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // If there are no owning, just convert to shared. - std::shared_ptr shared_msg = std::move(message); - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - } - return shared_msg; - } else { - // Construct a new shared pointer from the message for the buffers that - // do not require ownership and to return. - auto shared_msg = std::allocate_shared(*allocator, *message); - - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions); - } - if (!sub_ids.take_ownership_subscriptions.empty()) { - this->template add_owned_msg_to_buffers( - std::move(message), - sub_ids.take_ownership_subscriptions, - allocator); - } - - return shared_msg; + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[index_ownership_other].size() + + sub_ids.take_subscriptions[index_shared_other].size() > 0) + { + do_intra_process_publish_other_type( + message.get(), + sub_ids.take_subscriptions[index_ownership_other], + sub_ids.take_subscriptions[index_shared_other] + ); } + + return do_intra_process_publish_and_return_shared_same_type( + std::move(message), allocator, + sub_ids.take_subscriptions[index_ownership_same], + sub_ids.take_subscriptions[index_shared_same] + ); } /// Return true if the given rmw_gid_t matches any stored Publishers. @@ -326,8 +325,22 @@ class IntraProcessManager struct SplittedSubscriptions { - std::vector take_shared_subscriptions; - std::vector take_ownership_subscriptions; + enum + { + IndexSharedTyped = 0, IndexSharedSerialized = 1, + IndexOwnershipTyped = 2, IndexOwnershipSerialized = 3, + IndexNum = 4 + }; + + /// get the index for "take_subscriptions" depending on shared/serialized + constexpr static auto + get_index(const bool is_shared, const bool is_serialized) + { + return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) + + (is_shared ? IndexSharedTyped : IndexOwnershipTyped); + } + + std::vector take_subscriptions[IndexNum]; }; using SubscriptionMap = @@ -346,21 +359,165 @@ class IntraProcessManager RCLCPP_PUBLIC void - insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + insert_sub_id_for_pub( + uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method, + bool is_serialized_subscriber); RCLCPP_PUBLIC bool can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const; + template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + void + do_intra_process_publish_same_type( + std::unique_ptr message, + std::shared_ptr::allocator_type> allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + if (take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + std::shared_ptr msg = std::move(message); + + this->template add_shared_msg_to_buffers(msg, take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So we this case is equivalent to all the buffers requiring ownership + + // Merge the two vector of ids into a unique one + std::vector concatenated_vector(take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); + + this->template add_owned_msg_to_buffers( + std::move(message), + concatenated_vector, + allocator); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = std::allocate_shared(*allocator, *message); + + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + this->template add_owned_msg_to_buffers( + std::move(message), take_ownership_subscriptions, allocator); + } + } + + template + void + do_intra_process_publish_other_type( + const MessageT * unsupported_message, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // get first subscription + const auto subscription_id = + take_ownership_subscriptions.empty() ? + take_shared_subscriptions.front() : + take_ownership_subscriptions.front(); + + auto subscription_it = subscriptions_.find(subscription_id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.subscription; + + // convert published message to the supported type + auto message = convert_message(unsupported_message, subscription_base); + + if (take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + this->template add_shared_msg_to_buffers(std::move(message), take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So we this case is equivalent to all the buffers requiring ownership + + // Merge the two vector of ids into a unique one + std::vector concatenated_vector(take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); + + add_owned_msg_to_buffers(std::move(message), concatenated_vector); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = subscription_base->create_shared_message(message.get()); + + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + add_owned_msg_to_buffers(std::move(message), take_ownership_subscriptions); + } + } + + template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + std::shared_ptr + do_intra_process_publish_and_return_shared_same_type( + std::unique_ptr message, + std::shared_ptr::allocator_type> allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + if (take_ownership_subscriptions.empty()) { + // If there are no owning, just convert to shared. + std::shared_ptr shared_msg = std::move(message); + if (!take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + } + return shared_msg; + } else { + // Construct a new shared pointer from the message for the buffers that + // do not require ownership and to return. + auto shared_msg = std::allocate_shared(*allocator, *message); + + if (!take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, + take_shared_subscriptions); + } + if (!take_ownership_subscriptions.empty()) { + this->template add_owned_msg_to_buffers( + std::move(message), + take_ownership_subscriptions, + allocator); + } + + return shared_msg; + } + } + template void add_shared_msg_to_buffers( std::shared_ptr message, std::vector subscription_ids) { - constexpr bool is_serialized_publisher = - serialization_traits::is_serialized_message_class::value; - for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); if (subscription_it == subscriptions_.end()) { @@ -368,29 +525,7 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - // same message type, so it can just be passed through - if (subscription_base->is_serialized() == is_serialized_publisher) { - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - subscription->provide_intra_process_message(message); - } else if (is_serialized_publisher) { - // publisher provides a serialized message, while subscriber expects a ROS message - provide_intra_process_message(subscription_base, *message); - } else { - // publisher provides a ROS message, while subscriber expects a serialized message - rclcpp::Serialization serialization; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - auto serialized_message = std::make_unique(); - serialization.serialize_message( - reinterpret_cast(message.get()), serialized_message.get()); - - subscription->provide_intra_process_message(std::move(serialized_message)); - } + subscription_base->provide_intra_process_message(message); } } @@ -407,9 +542,6 @@ class IntraProcessManager using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; - constexpr bool is_serialized_publisher = - serialization_traits::is_serialized_message_class::value; - for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); if (subscription_it == subscriptions_.end()) { @@ -417,61 +549,106 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - // same message type, so it can just be passed through - if (subscription_base->is_serialized() == is_serialized_publisher) { - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - if (std::next(it) == subscription_ids.end()) { - // If this is the last subscription, give up ownership - subscription->provide_intra_process_message(std::move(message)); - } else { - // Copy the message since we have additional subscriptions to serve - MessageUniquePtr copy_message; - Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); - MessageAllocTraits::construct(*allocator.get(), ptr, *message); - copy_message = MessageUniquePtr(ptr, deleter); - - subscription->provide_intra_process_message(std::move(copy_message)); - } - } else if (is_serialized_publisher) { - // publisher provides a serialized message, while subscriber expects a ROS message - provide_intra_process_message(subscription_base, *message); + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription->provide_intra_process_message(std::move(message)); } else { - // publisher provides a ROS message, while subscriber expects a serialized message - rclcpp::Serialization serialization; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); + // Copy the message since we have additional subscriptions to serve + MessageUniquePtr copy_message; + Deleter deleter = message.get_deleter(); + auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); + MessageAllocTraits::construct(*allocator.get(), ptr, *message); + copy_message = MessageUniquePtr(ptr, deleter); + + subscription->provide_intra_process_message(std::move(copy_message)); + } + } + } - auto serialized_message = std::make_unique(); - serialization.serialize_message( - reinterpret_cast(message.get()), serialized_message.get()); + /// method for unknown allocator using subscription for allocation + void + add_owned_msg_to_buffers( + std::shared_ptr message, + std::vector subscription_ids) + { + for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { + auto subscription_it = subscriptions_.find(*it); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.subscription; + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription_base->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + std::shared_ptr copy_message = + subscription_base->create_shared_message(message.get()); - subscription->provide_intra_process_message(std::move(serialized_message)); + subscription_base->provide_intra_process_message(std::move(copy_message)); } } } - template - static std::enable_if_t::value> - provide_intra_process_message( - rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, - const T & serialized_message) + /// Convert received message to message type of subscriber + /** + * Method for ros message for serialization to rclcpp::SerializedMessage. + * The publisher has a template type while the subscribers is serialized. + * + * In addition this generates a unique intra process id for the publisher. + * + * \param message the ros message from publisher. + * \param subscription a subscriber used for creating the serialized message. + * \return a shared pointer (containing rclcpp::SerializedMessage) with deleter. + */ + template + std::shared_ptr convert_message( + const MessageT * message, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) { - (void)subscription; - (void)serialized_message; - // prevent call if actual message type does not match + // serialize + auto serialized_message = subscription->create_shared_message(); + rclcpp::Serialization serialization; + + rcpputils::check_true(nullptr != serialized_message, "Serialized message is nullpointer."); + + serialization.serialize_message( + message, + reinterpret_cast(serialized_message.get())); + + return serialized_message; } - static void - provide_intra_process_message( - rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, - const SerializedMessage & serialized_message) + /// Convert received message to message type of subscriber + /** + * Overloaded method for rclcpp::SerializedMessage for deserialization. + * The publisher is serialized while the subscribers have a templated message type. + * + * In addition this generates a unique intra process id for the publisher. + * + * \param serialized_message the serialized message from publisher. + * \param subscription a subscriber used for creating ros message and serialization. + * \return a shared pointer (containing a ros message) with deleter. + */ + std::shared_ptr convert_message( + const SerializedMessage * serialized_message, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) { - subscription->provide_intra_process_message(serialized_message); + // deserialize + auto converted_message = subscription->create_shared_message(); + auto serialization = subscription->get_serialization(); + + rcpputils::check_true(nullptr != converted_message, "Converted message is nullpointer."); + rcpputils::check_true(nullptr != serialization, "Serialization is nullpointer."); + + serialization->deserialize_message(serialized_message, converted_message.get()); + + return converted_message; } PublisherToSubscriptionIdsMap pub_to_subs_; diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 0b9c9d6670..876badc069 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -48,7 +48,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) // create an entry for the publisher id and populate with already existing subscriptions for (auto & pair : subscriptions_) { if (can_communicate(publishers_[id], pair.second)) { - insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method); + insert_sub_id_for_pub( + pair.first, id, pair.second.use_take_shared_method, + pair.second.subscription->is_serialized()); } } @@ -70,7 +72,9 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { if (can_communicate(pair.second, subscriptions_[id])) { - insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method); + insert_sub_id_for_pub( + id, pair.first, subscriptions_[id].use_take_shared_method, + subscription->is_serialized()); } } @@ -85,19 +89,14 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) subscriptions_.erase(intra_process_subscription_id); for (auto & pair : pub_to_subs_) { - pair.second.take_shared_subscriptions.erase( - std::remove( - pair.second.take_shared_subscriptions.begin(), - pair.second.take_shared_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_shared_subscriptions.end()); - - pair.second.take_ownership_subscriptions.erase( - std::remove( - pair.second.take_ownership_subscriptions.begin(), - pair.second.take_ownership_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_ownership_subscriptions.end()); + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + pair.second.take_subscriptions[i].erase( + std::remove( + pair.second.take_subscriptions[i].begin(), + pair.second.take_subscriptions[i].end(), + intra_process_subscription_id), + pair.second.take_subscriptions[i].end()); + } } } @@ -141,9 +140,10 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) return 0; } - auto count = - publisher_it->second.take_shared_subscriptions.size() + - publisher_it->second.take_ownership_subscriptions.size(); + auto count = 0u; + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + count += publisher_it->second.take_subscriptions[i].size(); + } return count; } @@ -187,13 +187,12 @@ void IntraProcessManager::insert_sub_id_for_pub( uint64_t sub_id, uint64_t pub_id, - bool use_take_shared_method) + bool use_take_shared_method, + bool is_serialized_subscriber) { - if (use_take_shared_method) { - pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); - } else { - pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); - } + pub_to_subs_[pub_id].take_subscriptions[SplittedSubscriptions::get_index( + use_take_shared_method, + is_serialized_subscriber)].push_back(sub_id); } bool From 2e5a7bdd1c978e8857bb7839e79567c9f489eb9a Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 18:36:08 +0200 Subject: [PATCH 23/25] simplified code Signed-off-by: Joshua Hampp --- .../experimental/intra_process_manager.hpp | 68 +++++++++---------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d80acc2cf5..f4e800965f 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -190,18 +190,7 @@ class IntraProcessManager constexpr bool is_serialized_publisher = serialization_traits::is_serialized_message_class::value; - constexpr auto index_ownership_same = SplittedSubscriptions::get_index( - false, - is_serialized_publisher); - constexpr auto index_shared_same = SplittedSubscriptions::get_index( - true, - is_serialized_publisher); - constexpr auto index_ownership_other = SplittedSubscriptions::get_index( - false, - !is_serialized_publisher); - constexpr auto index_shared_other = SplittedSubscriptions::get_index( - true, - !is_serialized_publisher); + using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -216,20 +205,20 @@ class IntraProcessManager const auto & sub_ids = publisher_it->second; // check if (de)serialization is needed - if (sub_ids.take_subscriptions[index_ownership_other].size() + - sub_ids.take_subscriptions[index_shared_other].size() > 0) + if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + + sub_ids.take_subscriptions[Indicies::shared_other].size() > 0) { do_intra_process_publish_other_type( message.get(), - sub_ids.take_subscriptions[index_ownership_other], - sub_ids.take_subscriptions[index_shared_other] + sub_ids.take_subscriptions[Indicies::ownership_other], + sub_ids.take_subscriptions[Indicies::shared_other] ); } do_intra_process_publish_same_type( std::move(message), allocator, - sub_ids.take_subscriptions[index_ownership_same], - sub_ids.take_subscriptions[index_shared_same] + sub_ids.take_subscriptions[Indicies::ownership_same], + sub_ids.take_subscriptions[Indicies::shared_same] ); } @@ -245,19 +234,7 @@ class IntraProcessManager { constexpr bool is_serialized_publisher = serialization_traits::is_serialized_message_class::value; - - constexpr auto index_ownership_same = SplittedSubscriptions::get_index( - false, - is_serialized_publisher); - constexpr auto index_shared_same = SplittedSubscriptions::get_index( - true, - is_serialized_publisher); - constexpr auto index_ownership_other = SplittedSubscriptions::get_index( - false, - !is_serialized_publisher); - constexpr auto index_shared_other = SplittedSubscriptions::get_index( - true, - !is_serialized_publisher); + using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -272,20 +249,20 @@ class IntraProcessManager const auto & sub_ids = publisher_it->second; // check if (de)serialization is needed - if (sub_ids.take_subscriptions[index_ownership_other].size() + - sub_ids.take_subscriptions[index_shared_other].size() > 0) + if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + + sub_ids.take_subscriptions[Indicies::shared_other].size() > 0) { do_intra_process_publish_other_type( message.get(), - sub_ids.take_subscriptions[index_ownership_other], - sub_ids.take_subscriptions[index_shared_other] + sub_ids.take_subscriptions[Indicies::ownership_other], + sub_ids.take_subscriptions[Indicies::shared_other] ); } return do_intra_process_publish_and_return_shared_same_type( std::move(message), allocator, - sub_ids.take_subscriptions[index_ownership_same], - sub_ids.take_subscriptions[index_shared_same] + sub_ids.take_subscriptions[Indicies::ownership_same], + sub_ids.take_subscriptions[Indicies::shared_same] ); } @@ -343,6 +320,23 @@ class IntraProcessManager std::vector take_subscriptions[IndexNum]; }; + template + struct SplittedSubscriptionsIndicies + { + constexpr static auto ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized); + constexpr static auto shared_same = SplittedSubscriptions::get_index( + true, + is_serialized); + constexpr static auto ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized); + constexpr static auto shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized); + }; + using SubscriptionMap = std::unordered_map; From 6b3d9c79b0adecbc942ce85c4e9eacff1e254084 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 18:36:18 +0200 Subject: [PATCH 24/25] fixed modifier Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/experimental/intra_process_manager.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index f4e800965f..1da71f5ed3 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -566,7 +566,7 @@ class IntraProcessManager /// method for unknown allocator using subscription for allocation void add_owned_msg_to_buffers( - std::shared_ptr message, + std::shared_ptr message, std::vector subscription_ids) { for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { From f5c33629947d42aa7d2f134639725f5367875332 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Wed, 29 Apr 2020 18:36:41 +0200 Subject: [PATCH 25/25] updated comments Signed-off-by: Joshua Hampp --- .../experimental/intra_process_manager.hpp | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 1da71f5ed3..39a5d16aa1 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -189,7 +189,6 @@ class IntraProcessManager { constexpr bool is_serialized_publisher = serialization_traits::is_serialized_message_class::value; - using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -372,6 +371,7 @@ class IntraProcessManager const std::vector & take_ownership_subscriptions, const std::vector & take_shared_subscriptions) { + // subsriber and publisher have same message types using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -384,9 +384,9 @@ class IntraProcessManager take_shared_subscriptions.size() <= 1) { // There is at maximum 1 buffer that does not require ownership. - // So we this case is equivalent to all the buffers requiring ownership + // So this case is equivalent to all the buffers requiring ownership - // Merge the two vector of ids into a unique one + // Merge the two vectors of ids into a unique one std::vector concatenated_vector(take_shared_subscriptions); concatenated_vector.insert( concatenated_vector.end(), @@ -418,6 +418,7 @@ class IntraProcessManager const std::vector & take_ownership_subscriptions, const std::vector & take_shared_subscriptions) { + // subsriber and publisher have different message types // get first subscription const auto subscription_id = take_ownership_subscriptions.empty() ? @@ -474,6 +475,7 @@ class IntraProcessManager const std::vector & take_ownership_subscriptions, const std::vector & take_shared_subscriptions) { + // subsriber and publisher have same message types using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -563,7 +565,7 @@ class IntraProcessManager } } - /// method for unknown allocator using subscription for allocation + /// Method for unknown allocator using subscription for allocation void add_owned_msg_to_buffers( std::shared_ptr message, @@ -591,11 +593,9 @@ class IntraProcessManager /// Convert received message to message type of subscriber /** - * Method for ros message for serialization to rclcpp::SerializedMessage. + * Method to serialize a ros message to rclcpp::SerializedMessage. * The publisher has a template type while the subscribers is serialized. * - * In addition this generates a unique intra process id for the publisher. - * * \param message the ros message from publisher. * \param subscription a subscriber used for creating the serialized message. * \return a shared pointer (containing rclcpp::SerializedMessage) with deleter. @@ -623,8 +623,6 @@ class IntraProcessManager * Overloaded method for rclcpp::SerializedMessage for deserialization. * The publisher is serialized while the subscribers have a templated message type. * - * In addition this generates a unique intra process id for the publisher. - * * \param serialized_message the serialized message from publisher. * \param subscription a subscriber used for creating ros message and serialization. * \return a shared pointer (containing a ros message) with deleter.