From 91056b38b4e86a7c9bf3b7f5a238832d8d0b8297 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 19 Jul 2019 10:57:06 +0200 Subject: [PATCH 01/17] * enabled handling of serialized messages (including correct deletion) in mapped_ring_buffer * updated tests Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 122 ++++++++++++++++++- rclcpp/test/test_mapped_ring_buffer.cpp | 19 +-- 2 files changed, 130 insertions(+), 11 deletions(-) diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index c8fdf6493d..0578809000 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -21,11 +21,14 @@ #include #include #include +#include #include #include #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/subscription_traits.hpp" +#include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -33,10 +36,64 @@ namespace rclcpp namespace mapped_ring_buffer { +/// Deleter for serialized messages +struct deallocate_rmw_serialized_message +{ + void operator()(rmw_serialized_message_t * msg) const + { + if (msg) { + int error = rmw_serialized_message_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + throw std::runtime_error("Leaking memory. Error: " + + std::string(rcutils_get_error_string().str)); + } + } + } +}; + class RCLCPP_PUBLIC MappedRingBufferBase { public: RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase) + + using ConstVoidSharedPtr = std::shared_ptr; + + /// Returns true if the content is a serialized message. + virtual bool is_serialized() = 0; + + /// Create a mapped ring buffer for serialized messages with same allocator. + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_serialized_mapped_ring_buffer() const = 0; + + /// Returns type support of message type. For serialized case return nullptr. + virtual const rosidl_message_type_support_t * get_type_support() const = 0; + + /// Give the ownership of the stored value to the caller, at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + virtual void pop(uint64_t key, ConstVoidSharedPtr & value) = 0; + + /// Share ownership of the value stored in the ring buffer at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + virtual void get(uint64_t key, ConstVoidSharedPtr & value) = 0; }; /// Ring buffer container of shared_ptr's or unique_ptr's of T, which can be accessed by a key. @@ -75,8 +132,10 @@ class MappedRingBuffer : public MappedRingBufferBase * \param size size of the ring buffer; must be positive and non-zero. * \param allocator optional custom allocator */ - explicit MappedRingBuffer(size_t size, std::shared_ptr allocator = nullptr) - : elements_(size), head_(0) + explicit MappedRingBuffer( + size_t size, const rosidl_message_type_support_t * type_support, + std::shared_ptr allocator = nullptr) + : elements_(size), head_(0), type_support_(type_support) { if (size == 0) { throw std::invalid_argument("size must be a positive, non-zero value"); @@ -129,6 +188,13 @@ class MappedRingBuffer : public MappedRingBufferBase } } + /// Check if content is serialized. + bool + is_serialized() override + { + return type_support_ == nullptr; + } + /// Share ownership of the value stored in the ring buffer at the given key. /** * The key is matched if an element in the ring buffer has a matching key. @@ -275,6 +341,40 @@ class MappedRingBuffer : public MappedRingBufferBase return did_replace; } + /// Give the ownership of the stored value to the caller, at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + void pop(uint64_t key, ConstVoidSharedPtr & value) override + { + pop(key, (ConstElemSharedPtr &)value); + } + + + /// Share ownership of the value stored in the ring buffer at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + void get(uint64_t key, ConstVoidSharedPtr & value) override + { + get(key, (ConstElemSharedPtr &)value); + } + + /// Return true if the key is found in the ring buffer, otherwise false. bool has_key(uint64_t key) @@ -283,6 +383,23 @@ class MappedRingBuffer : public MappedRingBufferBase return elements_.end() != get_iterator_of_key(key); } + /// get type support for stored messages + const rosidl_message_type_support_t * + get_type_support() const override + { + return type_support_; + } + + /// create a corresponding ring buffer for serialized messages of type rmw_serialized_message_t + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_serialized_mapped_ring_buffer() const override + { + return mapped_ring_buffer::MappedRingBuffer< + rmw_serialized_message_t, + typename std::allocator_traits::template rebind_alloc + >::make_shared(elements_.size(), nullptr); + } + private: RCLCPP_DISABLE_COPY(MappedRingBuffer) @@ -311,6 +428,7 @@ class MappedRingBuffer : public MappedRingBufferBase size_t head_; std::shared_ptr allocator_; std::mutex data_mutex_; + const rosidl_message_type_support_t * type_support_; }; } // namespace mapped_ring_buffer diff --git a/rclcpp/test/test_mapped_ring_buffer.cpp b/rclcpp/test/test_mapped_ring_buffer.cpp index 5625804d13..e87baa4a11 100644 --- a/rclcpp/test/test_mapped_ring_buffer.cpp +++ b/rclcpp/test/test_mapped_ring_buffer.cpp @@ -25,9 +25,10 @@ */ TEST(TestMappedRingBuffer, empty) { // Cannot create a buffer of size zero. - EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(0), std::invalid_argument); + EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer + mrb(0, nullptr), std::invalid_argument); // Getting or popping an empty buffer should result in a nullptr. - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(1); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(1, nullptr); std::unique_ptr unique; mrb.get(1, unique); @@ -49,7 +50,7 @@ TEST(TestMappedRingBuffer, empty) { get and pop methods with shared_ptr signature. */ TEST(TestMappedRingBuffer, temporary_l_value_with_shared_get_pop) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); // Pass in value with temporary object mrb.push_and_replace(1, std::shared_ptr(new char('a'))); @@ -69,7 +70,7 @@ TEST(TestMappedRingBuffer, temporary_l_value_with_shared_get_pop) { get and pop methods with unique_ptr signature. */ TEST(TestMappedRingBuffer, temporary_l_value_with_unique_get_pop) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); // Pass in value with temporary object mrb.push_and_replace(1, std::shared_ptr(new char('a'))); @@ -89,7 +90,7 @@ TEST(TestMappedRingBuffer, temporary_l_value_with_unique_get_pop) { Using shared push_and_replace, get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_shared) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::shared_ptr expected(new char('a')); EXPECT_FALSE(mrb.push_and_replace(1, expected)); @@ -145,7 +146,7 @@ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_shared) { Using shared push_and_replace, unique get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_unique) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::shared_ptr expected(new char('a')); const char * expected_orig = expected.get(); @@ -207,7 +208,7 @@ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_unique) { Using unique push_and_replace, get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_unique) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::unique_ptr expected(new char('a')); const char * expected_orig = expected.get(); @@ -258,7 +259,7 @@ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_unique) { Using unique push_and_replace, shared get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_shared) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::unique_ptr expected(new char('a')); const char * expected_orig = expected.get(); @@ -308,7 +309,7 @@ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_shared) { Tests the affect of reusing keys (non-unique keys) in a mrb. */ TEST(TestMappedRingBuffer, non_unique_keys) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::shared_ptr input(new char('a')); mrb.push_and_replace(1, input); From 6b24f10c514a581ae17a63bb166b8e31ddcb837a Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 19 Jul 2019 10:58:40 +0200 Subject: [PATCH 02/17] * added specialization for serialized messages in intraprocess manager * added (de)serializer * updated tests Signed-off-by: Joshua Hampp --- .../include/rclcpp/intra_process_manager.hpp | 319 ++++++++++++++++-- .../rclcpp/intra_process_manager_impl.hpp | 46 ++- rclcpp/test/test_intra_process_manager.cpp | 5 +- 3 files changed, 327 insertions(+), 43 deletions(-) diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 5d3abed3f6..e10fe366f0 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_ #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_ +#include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/intra_process_manager_impl.hpp" @@ -241,10 +243,14 @@ class IntraProcessManager { using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + constexpr bool serialized = + rclcpp::subscription_traits::is_serialized_subscription_argument::value; uint64_t message_seq = 0; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id( - intra_process_publisher_id, message_seq); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + intra_process_publisher_id, + message_seq, + serialized); + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); if (!typed_buffer) { throw std::runtime_error("Typecast failed due to incorrect message type"); } @@ -254,7 +260,7 @@ class IntraProcessManager // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - impl_->store_intra_process_message(intra_process_publisher_id, message_seq); + impl_->store_intra_process_message(intra_process_publisher_id, message_seq, serialized); // Return the message sequence which is sent to the subscription. return message_seq; @@ -262,7 +268,10 @@ class IntraProcessManager template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::conditional< + rclcpp::subscription_traits::is_serialized_subscription_argument::value, + mapped_ring_buffer::deallocate_rmw_serialized_message, + std::default_delete>> uint64_t store_intra_process_message( uint64_t intra_process_publisher_id, @@ -270,10 +279,14 @@ class IntraProcessManager { using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + constexpr bool serialized = + rclcpp::subscription_traits::is_serialized_subscription_argument::value; uint64_t message_seq = 0; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id( - intra_process_publisher_id, message_seq); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + intra_process_publisher_id, + message_seq, + serialized); + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); if (!typed_buffer) { throw std::runtime_error("Typecast failed due to incorrect message type"); } @@ -283,12 +296,149 @@ class IntraProcessManager // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - impl_->store_intra_process_message(intra_process_publisher_id, message_seq); + impl_->store_intra_process_message(intra_process_publisher_id, message_seq, serialized); // Return the message sequence which is sent to the subscription. return message_seq; } + /// Helper to deserialize message from rmw_serialized_message_t to some message type + template + struct Deserializer + { + using ElemAllocTraits = allocator::AllocRebind; + using ElemAlloc = typename ElemAllocTraits::allocator_type; + using MessageAllocTraits = allocator::AllocRebind; + + MessageT * deserialize(const rmw_serialized_message_t * serialized_msg) + { + if (!serialized_msg || + serialized_msg->buffer_capacity == 0 || + serialized_msg->buffer_length == 0 || + serialized_msg->buffer == nullptr) + { + throw std::runtime_error("failed to deserialize nullptr serialized message"); + } + + ElemAlloc allocator; + auto ptr = MessageAllocTraits::allocate(allocator, 1); + + auto msg_ts = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + auto ret = rmw_deserialize(serialized_msg, msg_ts, ptr); + if (ret != RMW_RET_OK) { + throw std::runtime_error("failed to deserialize serialized message"); + } + + return ptr; + } + }; + + /// Helper to deserialize message from rmw_serialized_message_t when it should be kept serialized + template + struct Deserializer + { + rmw_serialized_message_t * deserialize(const rmw_serialized_message_t * serialized_msg) + { + if (!serialized_msg || + serialized_msg->buffer_capacity == 0 || + serialized_msg->buffer_length == 0 || + serialized_msg->buffer == nullptr) + { + throw std::runtime_error("failed to deserialize nullptr serialized message b"); + } + + return new rmw_serialized_message_t(*serialized_msg); + } + }; + + /// Serialize a message if needed. In this case just point to the content (same type). + template + struct Serializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + (void)type_support; + message = std::move(data); + } + }; + + /// Serialize a message if needed. rmw_serialized_message_t to a message type. + template + struct Serializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + message.reset(new rmw_serialized_message_t(serialize(data, type_support))); + } + + static rmw_serialized_message_t serialize( + const MessageTIn & msg, + const rosidl_message_type_support_t * type_support) + { + rcutils_allocator_t rcutils_allocator_ = rcutils_get_default_allocator(); + + auto serialized_message = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&serialized_message, 0, &rcutils_allocator_); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error( + "Error allocating resources for serialized message: " + + std::string(rcutils_get_error_string().str)); + } + + if (!msg) { + return serialized_message; + } + + auto error = rmw_serialize( + msg.get(), + type_support, + &serialized_message); + if (error != RCL_RET_OK) { + throw std::runtime_error("Failed to serialize"); + } + + return serialized_message; + } + }; + + /// Not serializing a message if same type. + template + struct TypedSerializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + (void)type_support; + (void)message; + (void)data; + throw std::runtime_error("Serialization case not supported!"); + } + }; + + /// Serialize a message if needed. rmw_serialized_message_t to a message type. + template + struct TypedSerializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + Serializer s; + message.reset(new rmw_serialized_message_t(s.serialize(data, type_support))); + } + }; + /// Take an intra process message. /** * The intra_process_publisher_id and message_sequence_number parameters @@ -326,7 +476,10 @@ class IntraProcessManager */ template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::conditional< + rclcpp::subscription_traits::is_serialized_subscription_argument::value, + mapped_ring_buffer::deallocate_rmw_serialized_message, + std::default_delete>> void take_intra_process_message( uint64_t intra_process_publisher_id, @@ -334,8 +487,6 @@ class IntraProcessManager uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr & message) { - using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; - using TypedMRB = mapped_ring_buffer::MappedRingBuffer; message = nullptr; size_t target_subs_size = 0; @@ -346,17 +497,83 @@ class IntraProcessManager requesting_subscriptions_intra_process_id, target_subs_size ); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); - if (!typed_buffer) { + + if (buffer == nullptr) { return; } - // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. - if (target_subs_size) { - // There are more subscriptions to serve, return a copy. - typed_buffer->get(message_sequence_number, message); + + if (buffer->is_serialized()) { + using MRBMessageAlloc = typename std::allocator_traits + >::template rebind_alloc; + using TypedMRB = + mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + typename TypedMRB::ElemUniquePtr serialized_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, serialized_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, serialized_msg); + } + + Deserializer deserializer; + message.reset(deserializer.deserialize(serialized_msg.get())); + if (rclcpp::subscription_traits::is_serialized_subscription_argument::value) { + serialized_msg.release(); + } + } else if (rclcpp::subscription_traits::is_serialized_subscription_argument::value) { + mapped_ring_buffer::MappedRingBufferBase::ConstVoidSharedPtr stored_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + buffer->get(message_sequence_number, stored_msg); + } else { + // This is the last one to be returned, transfer ownership. + buffer->pop(message_sequence_number, stored_msg); + } + if (!stored_msg) { + return; + } + + TypedSerializer< + mapped_ring_buffer::MappedRingBufferBase::ConstVoidSharedPtr, + std::unique_ptr, + MessageT> serializer; + serializer.serialize(stored_msg, message, buffer->get_type_support()); } else { - // This is the last one to be returned, transfer ownership. - typed_buffer->pop(message_sequence_number, message); + using MRBMessageAlloc = + typename std::allocator_traits::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + typename TypedMRB::ElemUniquePtr stored_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, stored_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, stored_msg); + } + if (!stored_msg) { + return; + } + + Serializer, MessageT> + serializer; + serializer.serialize(stored_msg, message, typed_buffer->get_type_support()); } } @@ -369,29 +586,71 @@ class IntraProcessManager uint64_t requesting_subscriptions_intra_process_id, std::shared_ptr & message) { - using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; - using TypedMRB = mapped_ring_buffer::MappedRingBuffer; message = nullptr; size_t target_subs_size = 0; std::lock_guard lock(take_mutex_); - mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message( + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = + impl_->take_intra_process_message( intra_process_publisher_id, message_sequence_number, requesting_subscriptions_intra_process_id, target_subs_size - ); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); - if (!typed_buffer) { + ); + + if (buffer == nullptr) { return; } - // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. - if (target_subs_size) { - // There are more subscriptions to serve, return a copy. - typed_buffer->get(message_sequence_number, message); + + if (buffer->is_serialized()) { + using MRBMessageAlloc = typename std::allocator_traits + >::template rebind_alloc; + using TypedMRB = + mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + std::shared_ptr serialized_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, serialized_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, serialized_msg); + } + + Deserializer deserializer; + message.reset(deserializer.deserialize(serialized_msg.get())); } else { - // This is the last one to be returned, transfer ownership. - typed_buffer->pop(message_sequence_number, message); + using MRBMessageAlloc = typename std::allocator_traits + ::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + std::shared_ptr stored_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, stored_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, stored_msg); + } + if (!stored_msg) { + return; + } + + Serializer, std::shared_ptr, MessageT> + serializer; + serializer.serialize(stored_msg, message, typed_buffer->get_type_support()); } } diff --git a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp index ab29af7b92..c4759d5a9c 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp @@ -69,10 +69,14 @@ class IntraProcessManagerImplBase virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id( uint64_t intra_process_publisher_id, - uint64_t & message_seq) = 0; + uint64_t & message_seq, + const bool serialized) = 0; virtual void - store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0; + store_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_seq, + const bool serialized) = 0; virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message( @@ -116,7 +120,7 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase // remove references to this subscription's id. for (auto & publisher_pair : publishers_) { for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { - sub_pair.second.erase(intra_process_subscription_id); + sub_pair.second.first.erase(intra_process_subscription_id); } } } @@ -135,6 +139,7 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase publishers_[id].sequence_number.store(0); publishers_[id].buffer = mrb; + publishers_[id].buffer_serialized = mrb->make_serialized_mapped_ring_buffer(); publishers_[id].target_subscriptions_by_message_sequence.reserve(size); } @@ -148,7 +153,8 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id( uint64_t intra_process_publisher_id, - uint64_t & message_seq) + uint64_t & message_seq, + const bool serialized) { std::lock_guard lock(runtime_mutex_); auto it = publishers_.find(intra_process_publisher_id); @@ -159,11 +165,17 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase // Calculate the next message sequence number. message_seq = info.sequence_number.fetch_add(1); + if (serialized) { + return info.buffer_serialized; + } return info.buffer; } void - store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) + store_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_seq, + const bool serialized) { std::lock_guard lock(runtime_mutex_); auto it = publishers_.find(intra_process_publisher_id); @@ -182,17 +194,19 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase // Store the list for later comparison. if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) { info.target_subscriptions_by_message_sequence.emplace( - message_seq, AllocSet(std::less(), uint64_allocator)); + message_seq, + std::make_pair(AllocSet(std::less(), uint64_allocator), + serialized)); } else { - info.target_subscriptions_by_message_sequence[message_seq].clear(); + info.target_subscriptions_by_message_sequence[message_seq].first.clear(); } std::copy( destined_subscriptions.begin(), destined_subscriptions.end(), // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] std::inserter( - info.target_subscriptions_by_message_sequence[message_seq], + info.target_subscriptions_by_message_sequence[message_seq].first, // This ends up only being a hint to std::set, could also be .begin(). - info.target_subscriptions_by_message_sequence[message_seq].end() + info.target_subscriptions_by_message_sequence[message_seq].first.end() ) ); } @@ -217,13 +231,15 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase } // Figure out how many subscriptions are left. AllocSet * target_subs; + bool serialized = false; { auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); if (it == info->target_subscriptions_by_message_sequence.end()) { // Message is no longer being stored by this publisher. return 0; } - target_subs = &it->second; + target_subs = &it->second.first; + serialized = it->second.second; } { auto it = std::find( @@ -236,6 +252,10 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase target_subs->erase(it); } size = target_subs->size(); + // did we store a serialized message? + if (serialized) { + return info->buffer_serialized; + } return info->buffer; } @@ -330,12 +350,14 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase PublisherBase::WeakPtr publisher; std::atomic sequence_number; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer_serialized; using TargetSubscriptionsMap = std::unordered_map< - uint64_t, AllocSet, + uint64_t, std::pair, std::hash, std::equal_to, - RebindAlloc>>; + RebindAlloc>>>; TargetSubscriptionsMap target_subscriptions_by_message_sequence; + TargetSubscriptionsMap target_subscriptions_by_message_sequence_serialized; }; using PublisherMap = std::unordered_map< diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index fe3c535107..e2964c5a86 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -91,7 +91,10 @@ class Publisher : public PublisherBase return mapped_ring_buffer::MappedRingBuffer< T, typename Publisher::MessageAlloc - >::make_shared(size, allocator_); + >::make_shared( + size, + rosidl_typesupport_cpp::get_message_type_support_handle(), + allocator_); } std::shared_ptr get_allocator() From f805333271e4bad53e22a4b984a6a68fa6ac4892 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 19 Jul 2019 10:59:31 +0200 Subject: [PATCH 03/17] * updated publisher for serialized messages (to enable this in publisher_base without type info) Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 58 ++------------------ rclcpp/include/rclcpp/publisher_base.hpp | 17 ++++++ rclcpp/src/rclcpp/publisher_base.cpp | 69 ++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 53 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index a0bd342cc9..582fa226e9 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -46,6 +46,7 @@ template> class Publisher : public PublisherBase { public: + using PublisherBase::publish; using MessageAllocTraits = allocator::AllocRebind; using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; @@ -88,7 +89,10 @@ class Publisher : public PublisherBase return mapped_ring_buffer::MappedRingBuffer< MessageT, typename Publisher::MessageAlloc - >::make_shared(size, this->get_allocator()); + >::make_shared( + size, + rosidl_typesupport_cpp::get_message_type_support_handle(), + this->get_allocator()); } /// Send a message to the topic for this publisher. @@ -170,23 +174,6 @@ class Publisher : public PublisherBase return this->publish(*msg); } - void - publish(const rcl_serialized_message_t & serialized_msg) - { - return this->do_serialized_publish(&serialized_msg); - } - -// Skip deprecated attribute in windows, as it raise a warning in template specialization. -#if !defined(_WIN32) - [[deprecated( - "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]] -#endif - void - publish(const rcl_serialized_message_t * serialized_msg) - { - return this->do_serialized_publish(serialized_msg); - } - // Skip deprecated attribute in windows, as it raise a warning in template specialization. #if !defined(_WIN32) [[deprecated( @@ -223,41 +210,6 @@ class Publisher : public PublisherBase } } - void - do_serialized_publish(const rcl_serialized_message_t * serialized_msg) - { - if (intra_process_is_enabled_) { - // TODO(Karsten1987): support serialized message passed by intraprocess - throw std::runtime_error("storing serialized messages in intra process is not supported yet"); - } - auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); - } - } - - void - do_intra_process_publish(uint64_t message_seq) - { - rcl_interfaces::msg::IntraProcessMessage ipm; - ipm.publisher_id = intra_process_publisher_id_; - ipm.message_sequence = message_seq; - auto status = rcl_publish(&intra_process_publisher_handle_, &ipm, nullptr); - if (RCL_RET_PUBLISHER_INVALID == status) { - rcl_reset_error(); // next call will reset error message if not context - if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) { - rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_); - if (nullptr != context && !rcl_context_is_valid(context)) { - // publisher is invalid due to context being shutdown - return; - } - } - } - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); - } - } - uint64_t store_intra_process_message( uint64_t publisher_id, diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index afb68302f8..e13c263d4d 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -195,7 +195,24 @@ class PublisherBase IntraProcessManagerSharedPtr ipm, const rcl_publisher_options_t & intra_process_options); + void + publish(const rcl_serialized_message_t & serialized_msg); + +// Skip deprecated attribute in windows, as it raise a warning in template specialization. +#if !defined(_WIN32) + [[deprecated( + "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]] +#endif + void + publish(const rcl_serialized_message_t * serialized_msg); + protected: + void + do_serialized_publish(const rcl_serialized_message_t * serialized_msg); + + void + do_intra_process_publish(uint64_t message_seq); + template void add_event_handler( diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index a2c6ceac91..d8a4abe692 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -316,3 +316,72 @@ PublisherBase::setup_intra_process( throw std::runtime_error(msg); } } + +void +PublisherBase::publish(const rcl_serialized_message_t & serialized_msg) +{ + return this->do_serialized_publish(&serialized_msg); +} + +// Skip deprecated attribute in windows, as it raise a warning in template specialization. +#if !defined(_WIN32) +[[deprecated( + "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]] +#endif +void +PublisherBase::publish(const rcl_serialized_message_t * serialized_msg) +{ + return this->do_serialized_publish(serialized_msg); +} + +void +PublisherBase::do_serialized_publish(const rcl_serialized_message_t * serialized_msg) +{ + if (!serialized_msg) { + throw std::runtime_error("cannot publisher msg which is a null pointer"); + } + + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (intra_process_is_enabled_) { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + const uint64_t message_seq = + ipm->template store_intra_process_message( + intra_process_publisher_id_, std::make_unique(*serialized_msg)); + this->do_intra_process_publish(message_seq); + } + + if (inter_process_publish_needed) { + auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } + } +} + +void +PublisherBase::do_intra_process_publish(uint64_t message_seq) +{ + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + auto status = rcl_publish(&intra_process_publisher_handle_, &ipm, nullptr); + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); // next call will reset error message if not context + if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) { + rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_); + if (nullptr != context && !rcl_context_is_valid(context)) { + // publisher is invalid due to context being shutdown + return; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); + } +} From cfc380231c85ccf2e9f497cce65cf958b9ac2b3a Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 19 Jul 2019 10:59:53 +0200 Subject: [PATCH 04/17] * added unit test for serialized intraprocess communication Signed-off-by: Joshua Hampp --- rclcpp/CMakeLists.txt | 12 + .../test/test_intra_process_communication.cpp | 269 ++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 rclcpp/test/test_intra_process_communication.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 07a67622b0..06e4ab9797 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -208,6 +208,18 @@ if(BUILD_TESTING) "rosidl_typesupport_cpp" ) endif() + ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp) + if(TARGET test_intra_process_communication) + ament_target_dependencies(test_intra_process_communication + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_generator_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + target_link_libraries(test_intra_process_communication ${PROJECT_NAME}) ament_add_gtest(test_node test/test_node.cpp) if(TARGET test_node) ament_target_dependencies(test_node diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp new file mode 100644 index 0000000000..b66b4a31bb --- /dev/null +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -0,0 +1,269 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + + +static int g_counter = 0; + +std::shared_ptr make_serialized_string_msg( + const std::shared_ptr & stringMsg) +{ + auto m_allocator = rcutils_get_default_allocator(); + size_t message_size = 80u + static_cast(sizeof(rcl_interfaces::msg::IntraProcessMessage)); + + auto msg = new rcutils_uint8_array_t; + *msg = rcutils_get_zero_initialized_uint8_array(); + auto ret = rcutils_uint8_array_init(msg, message_size, &m_allocator); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error("Error allocating resources " + std::to_string(ret)); + } + + ++g_counter; + auto serialized_data = std::shared_ptr( + msg, + [](rcutils_uint8_array_t * msg) { + --g_counter; + int error = rcutils_uint8_array_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "m4_test", + "Leaking memory %i", + error); + } + }); + + serialized_data->buffer_length = message_size; + + static auto imageTypeSupport = + rosidl_typesupport_cpp::get_message_type_support_handle + (); + auto error = rmw_serialize(stringMsg.get(), imageTypeSupport, serialized_data.get()); + if (error != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("m4_test", "Something went wrong preparing the serialized message"); + } + + return serialized_data; +} + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expect intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + +protected: + void SetUp() {} + + void TearDown() {} + + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( + 2000); +std::array counts; + +void OnMessageSerialized(const std::shared_ptr msg) +{ + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageSerializedU(std::unique_ptr msg) +{ + EXPECT_NE(msg, nullptr); + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageConst(std::shared_ptr msg) +{ + EXPECT_EQ(msg->message_sequence, 1234u); + + ++counts[1]; +} + +void OnMessageU(std::unique_ptr msg) +{ + EXPECT_EQ(msg->message_sequence, 1234u); + + ++counts[1]; +} + +void OnMessage(std::shared_ptr msg) +{ + EXPECT_EQ(msg->message_sequence, 1234u); + + ++counts[1]; +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +{ + TestParameters parameters = GetParam(); + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic", 10); + + auto mem_strategy = + rclcpp::message_memory_strategy::MessageMemoryStrategy + ::create_default(); + + auto stringMsg = std::make_shared(); + stringMsg->message_sequence = 1234u; + + { + auto msg0 = make_serialized_string_msg(stringMsg); + + auto sub = node->create_subscription("/topic", 10, + &OnMessage); + auto sub1 = node->create_subscription("/topic", 10, + &OnMessageU); + auto sub2 = node->create_subscription("/topic", 10, + &OnMessageConst); + auto sub_ser = node->create_subscription("/topic", 10, + &OnMessageSerialized); + auto sub_ser2 = node->create_subscription("/topic", + 10, + &OnMessageSerialized); + auto sub_ser3 = node->create_subscription("/topic", + 10, + &OnMessageSerializedU); + + rclcpp::sleep_for(offset); + + counts.fill(0); + + std::unique_ptr stringMsgU( + new rcl_interfaces::msg::IntraProcessMessage( + *stringMsg)); + std::unique_ptr msg0U(new rcutils_uint8_array_t(*msg0)); + + publisher->publish(msg0); + publisher->publish(stringMsg); + publisher->publish(std::move(msg0U)); + publisher->publish(std::move(stringMsgU)); + + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + + EXPECT_EQ(counts[0], 12u); + EXPECT_EQ(counts[1], 12u); + } + + EXPECT_EQ(g_counter, 0); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +TestParameters parameters[] = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); From 91aa3b54ce72bac07358a0e3842f36c17ceecc76 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:24:41 +0200 Subject: [PATCH 05/17] removed unused delcaration Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/intra_process_manager.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index e10fe366f0..efaf269319 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -308,7 +308,6 @@ class IntraProcessManager { using ElemAllocTraits = allocator::AllocRebind; using ElemAlloc = typename ElemAllocTraits::allocator_type; - using MessageAllocTraits = allocator::AllocRebind; MessageT * deserialize(const rmw_serialized_message_t * serialized_msg) { @@ -321,7 +320,7 @@ class IntraProcessManager } ElemAlloc allocator; - auto ptr = MessageAllocTraits::allocate(allocator, 1); + auto ptr = ElemAllocTraits::allocate(allocator, 1); auto msg_ts = rosidl_typesupport_cpp::get_message_type_support_handle(); From 1da30a5e032204d6fb1a4248aaf18e31733612b7 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:25:25 +0200 Subject: [PATCH 06/17] updated variable types (auto and const) Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/intra_process_manager.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index efaf269319..c8ccba9077 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -322,7 +322,7 @@ class IntraProcessManager ElemAlloc allocator; auto ptr = ElemAllocTraits::allocate(allocator, 1); - auto msg_ts = + const auto msg_ts = rosidl_typesupport_cpp::get_message_type_support_handle(); auto ret = rmw_deserialize(serialized_msg, msg_ts, ptr); @@ -382,10 +382,10 @@ class IntraProcessManager const MessageTIn & msg, const rosidl_message_type_support_t * type_support) { - rcutils_allocator_t rcutils_allocator_ = rcutils_get_default_allocator(); + auto rcutils_allocator_ = rcutils_get_default_allocator(); auto serialized_message = rmw_get_zero_initialized_serialized_message(); - auto ret = rmw_serialized_message_init(&serialized_message, 0, &rcutils_allocator_); + const auto ret = rmw_serialized_message_init(&serialized_message, 0, &rcutils_allocator_); if (ret != RCUTILS_RET_OK) { throw std::runtime_error( "Error allocating resources for serialized message: " + From 069a1eac09a726a28e91394887c56309a52d6aae Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:25:57 +0200 Subject: [PATCH 07/17] aligned to code style Signed-off-by: Joshua Hampp --- .../include/rclcpp/intra_process_manager.hpp | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index c8ccba9077..1ba30d2ee5 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -314,9 +314,9 @@ class IntraProcessManager if (!serialized_msg || serialized_msg->buffer_capacity == 0 || serialized_msg->buffer_length == 0 || - serialized_msg->buffer == nullptr) + !serialized_msg->buffer) { - throw std::runtime_error("failed to deserialize nullptr serialized message"); + throw std::runtime_error("Failed to deserialize nullptr serialized message."); } ElemAlloc allocator; @@ -325,9 +325,9 @@ class IntraProcessManager const auto msg_ts = rosidl_typesupport_cpp::get_message_type_support_handle(); - auto ret = rmw_deserialize(serialized_msg, msg_ts, ptr); + const auto ret = rmw_deserialize(serialized_msg, msg_ts, ptr); if (ret != RMW_RET_OK) { - throw std::runtime_error("failed to deserialize serialized message"); + throw std::runtime_error("Failed to deserialize serialized message."); } return ptr; @@ -345,7 +345,7 @@ class IntraProcessManager serialized_msg->buffer_length == 0 || serialized_msg->buffer == nullptr) { - throw std::runtime_error("failed to deserialize nullptr serialized message b"); + throw std::runtime_error("Failed to deserialize nullptr serialized message."); } return new rmw_serialized_message_t(*serialized_msg); @@ -353,7 +353,8 @@ class IntraProcessManager }; /// Serialize a message if needed. In this case just point to the content (same type). - template + template struct Serializer { void serialize( @@ -401,7 +402,7 @@ class IntraProcessManager type_support, &serialized_message); if (error != RCL_RET_OK) { - throw std::runtime_error("Failed to serialize"); + throw std::runtime_error("Failed to serialize."); } return serialized_message; @@ -409,7 +410,8 @@ class IntraProcessManager }; /// Not serializing a message if same type. - template + template struct TypedSerializer { void serialize( @@ -497,7 +499,7 @@ class IntraProcessManager target_subs_size ); - if (buffer == nullptr) { + if (!buffer) { return; } @@ -544,8 +546,7 @@ class IntraProcessManager TypedSerializer< mapped_ring_buffer::MappedRingBufferBase::ConstVoidSharedPtr, - std::unique_ptr, - MessageT> serializer; + std::unique_ptr> serializer; serializer.serialize(stored_msg, message, buffer->get_type_support()); } else { using MRBMessageAlloc = @@ -570,7 +571,7 @@ class IntraProcessManager return; } - Serializer, MessageT> + Serializer> serializer; serializer.serialize(stored_msg, message, typed_buffer->get_type_support()); } @@ -597,7 +598,7 @@ class IntraProcessManager target_subs_size ); - if (buffer == nullptr) { + if (!buffer) { return; } From 6262b77ceffa525c58d83f1d11352263ea0220c4 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:26:08 +0200 Subject: [PATCH 08/17] added parameter description Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index 0578809000..8780442de1 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -130,6 +130,7 @@ class MappedRingBuffer : public MappedRingBufferBase * The constructor will allocate memory while reserving space. * * \param size size of the ring buffer; must be positive and non-zero. + * \param type_support message type of content; for serialized messages nullptr * \param allocator optional custom allocator */ explicit MappedRingBuffer( From b50401e85287d03200d65d31f1da87afa81bf36b Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:26:19 +0200 Subject: [PATCH 09/17] using c++ cast Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index 8780442de1..bea81d78ee 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -355,7 +355,7 @@ class MappedRingBuffer : public MappedRingBufferBase */ void pop(uint64_t key, ConstVoidSharedPtr & value) override { - pop(key, (ConstElemSharedPtr &)value); + pop(key, reinterpret_cast(value)); } @@ -372,7 +372,7 @@ class MappedRingBuffer : public MappedRingBufferBase */ void get(uint64_t key, ConstVoidSharedPtr & value) override { - get(key, (ConstElemSharedPtr &)value); + get(key, reinterpret_cast(value)); } From 05dea5389e734877a71c5d2a162b9a990563955b Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:28:04 +0200 Subject: [PATCH 10/17] made error output style consistent Signed-off-by: Joshua Hampp --- rclcpp/src/rclcpp/publisher_base.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index d8a4abe692..56ba8d50ea 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -183,7 +183,7 @@ PublisherBase::get_subscription_count() const } } if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count"); + rclcpp::exceptions::throw_from_rcl_error(status, "Failed to get get subscription count."); } return inter_process_subscription_count; } @@ -199,8 +199,8 @@ PublisherBase::get_intra_process_subscription_count() const // TODO(ivanpauno): should this just return silently? Or maybe return with a warning? // Same as wjwwood comment in publisher_factory create_shared_publish_callback. throw std::runtime_error( - "intra process subscriber count called after " - "destruction of intra process manager"); + "Intra process subscriber count called after " + "destruction of intra process manager."); } return ipm->get_subscription_count(intra_process_publisher_id_); } @@ -210,7 +210,7 @@ PublisherBase::get_actual_qos() const { const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(&publisher_handle_); if (!qos) { - auto msg = std::string("failed to get qos settings: ") + rcl_get_error_string().str; + auto msg = std::string("Failed to get qos settings: ") + rcl_get_error_string().str; rcl_reset_error(); throw std::runtime_error(msg); } @@ -235,14 +235,14 @@ PublisherBase::operator==(const rmw_gid_t * gid) const bool result = false; auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result); if (ret != RMW_RET_OK) { - auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str; + auto msg = std::string("Failed to compare gids: ") + rmw_get_error_string().str; rmw_reset_error(); throw std::runtime_error(msg); } if (!result) { ret = rmw_compare_gids_equal(gid, &this->get_intra_process_gid(), &result); if (ret != RMW_RET_OK) { - auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str; + auto msg = std::string("Failed to compare gids: ") + rmw_get_error_string().str; rmw_reset_error(); throw std::runtime_error(msg); } @@ -266,7 +266,7 @@ PublisherBase::setup_intra_process( // Intraprocess configuration is not allowed with "durability" qos policy non "volatile". if (this->get_actual_qos().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) { throw std::invalid_argument( - "intraprocess communication is not allowed with durability qos policy non-volatile"); + "Intraprocess communication is not allowed with durability qos policy non-volatile"); } const char * topic_name = this->get_topic_name(); if (!topic_name) { @@ -292,7 +292,7 @@ PublisherBase::setup_intra_process( rcl_node_get_namespace(rcl_node_handle)); } - rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process publisher"); + rclcpp::exceptions::throw_from_rcl_error(ret, "Could not create intra process publisher"); } intra_process_publisher_id_ = intra_process_publisher_id; @@ -311,7 +311,7 @@ PublisherBase::setup_intra_process( publisher_rmw_handle, &intra_process_rmw_gid_); if (rmw_ret != RMW_RET_OK) { auto msg = - std::string("failed to create intra process publisher gid: ") + rmw_get_error_string().str; + std::string("Failed to create intra process publisher gid: ") + rmw_get_error_string().str; rmw_reset_error(); throw std::runtime_error(msg); } @@ -338,7 +338,7 @@ void PublisherBase::do_serialized_publish(const rcl_serialized_message_t * serialized_msg) { if (!serialized_msg) { - throw std::runtime_error("cannot publisher msg which is a null pointer"); + throw std::runtime_error("Cannot publisher msg which is a null pointer."); } bool inter_process_publish_needed = @@ -348,7 +348,7 @@ PublisherBase::do_serialized_publish(const rcl_serialized_message_t * serialized auto ipm = weak_ipm_.lock(); if (!ipm) { throw std::runtime_error( - "intra process publish called after destruction of intra process manager"); + "Intra process publish called after destruction of intra process manager."); } const uint64_t message_seq = ipm->template store_intra_process_message( @@ -359,7 +359,7 @@ PublisherBase::do_serialized_publish(const rcl_serialized_message_t * serialized if (inter_process_publish_needed) { auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + rclcpp::exceptions::throw_from_rcl_error(status, "Failed to publish serialized message."); } } } @@ -382,6 +382,6 @@ PublisherBase::do_intra_process_publish(uint64_t message_seq) } } if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); + rclcpp::exceptions::throw_from_rcl_error(status, "Failed to publish intra process message."); } } From e649ad89c24e170d2f0b568f23626b6868e1499a Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:28:11 +0200 Subject: [PATCH 11/17] reordered includes Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index b66b4a31bb..4443da553b 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -17,9 +17,9 @@ #include #include -#include #include #include +#include #include "rclcpp/exceptions.hpp" #include "rclcpp/publisher.hpp" From 582600df2fc71b2caabe4a2ee4f561a399c75f52 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:28:37 +0200 Subject: [PATCH 12/17] replaced global variables by function call Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 4443da553b..558d90f866 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -26,7 +26,11 @@ #include "rclcpp/rclcpp.hpp" -static int g_counter = 0; +int32_t & get_test_allocation_counter() +{ + static int32_t counter = 0; + return counter; +} std::shared_ptr make_serialized_string_msg( const std::shared_ptr & stringMsg) @@ -41,11 +45,11 @@ std::shared_ptr make_serialized_string_msg( throw std::runtime_error("Error allocating resources " + std::to_string(ret)); } - ++g_counter; + ++get_test_allocation_counter(); auto serialized_data = std::shared_ptr( msg, [](rcutils_uint8_array_t * msg) { - --g_counter; + --get_test_allocation_counter(); int error = rcutils_uint8_array_fini(msg); delete msg; if (error != RCUTILS_RET_OK) { @@ -202,7 +206,7 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) EXPECT_EQ(counts[1], 12u); } - EXPECT_EQ(g_counter, 0); + EXPECT_EQ(get_test_allocation_counter(), 0); } auto get_new_context() From 46f809c3fbbe19a7e8b77d2009ce6b15c8dced89 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:30:38 +0200 Subject: [PATCH 13/17] fixed deprecated warnings Signed-off-by: Joshua Hampp --- .../test/test_intra_process_communication.cpp | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 558d90f866..dee4177185 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -192,10 +192,23 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) *stringMsg)); std::unique_ptr msg0U(new rcutils_uint8_array_t(*msg0)); - publisher->publish(msg0); + // Now deprecated functions. +#if !defined(_WIN32) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#else // !defined(_WIN32) +# pragma warning(push) +# pragma warning(disable: 4996) +#endif + publisher->publish(*msg0); publisher->publish(stringMsg); - publisher->publish(std::move(msg0U)); + publisher->publish(*msg0U); publisher->publish(std::move(stringMsgU)); +#if !defined(_WIN32) +# pragma GCC diagnostic pop +#else // !defined(_WIN32) +# pragma warning(pop) +#endif rclcpp::spin_some(node); rclcpp::sleep_for(offset); From 7f474f7cc89442fb8dfb791e6b6fc8e42f64ab52 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:31:38 +0200 Subject: [PATCH 14/17] fixed variable name Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index dee4177185..44772b82be 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -62,10 +62,10 @@ std::shared_ptr make_serialized_string_msg( serialized_data->buffer_length = message_size; - static auto imageTypeSupport = + static auto type = rosidl_typesupport_cpp::get_message_type_support_handle (); - auto error = rmw_serialize(stringMsg.get(), imageTypeSupport, serialized_data.get()); + auto error = rmw_serialize(stringMsg.get(), type, serialized_data.get()); if (error != RMW_RET_OK) { RCUTILS_LOG_ERROR_NAMED("m4_test", "Something went wrong preparing the serialized message"); } From 24a585d805c4e8d25892b8f0a556d73005a8b405 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:31:51 +0200 Subject: [PATCH 15/17] fixed logger name Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 44772b82be..b96b904ab3 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -54,7 +54,7 @@ std::shared_ptr make_serialized_string_msg( delete msg; if (error != RCUTILS_RET_OK) { RCUTILS_LOG_ERROR_NAMED( - "m4_test", + "test_intra_process_communication", "Leaking memory %i", error); } @@ -67,7 +67,8 @@ std::shared_ptr make_serialized_string_msg( (); auto error = rmw_serialize(stringMsg.get(), type, serialized_data.get()); if (error != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED("m4_test", "Something went wrong preparing the serialized message"); + RCUTILS_LOG_ERROR_NAMED("test_intra_process_communication", + "Something went wrong preparing the serialized message"); } return serialized_data; From df09321aedcceb704fa14f44dd4b21ebad1c16d7 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 5 Aug 2019 11:31:59 +0200 Subject: [PATCH 16/17] removed unused methods Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index b96b904ab3..e99a05b8cb 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -101,10 +101,6 @@ class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam Date: Wed, 7 Aug 2019 09:35:09 +0200 Subject: [PATCH 17/17] reorderd includes Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index e99a05b8cb..059291f9ae 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -18,8 +18,8 @@ #include #include -#include #include +#include #include "rclcpp/exceptions.hpp" #include "rclcpp/publisher.hpp"