diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 07a67622b0..06e4ab9797 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -208,6 +208,18 @@ if(BUILD_TESTING) "rosidl_typesupport_cpp" ) endif() + ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp) + if(TARGET test_intra_process_communication) + ament_target_dependencies(test_intra_process_communication + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_generator_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + target_link_libraries(test_intra_process_communication ${PROJECT_NAME}) ament_add_gtest(test_node test/test_node.cpp) if(TARGET test_node) ament_target_dependencies(test_node diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 5d3abed3f6..1ba30d2ee5 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_ #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_ +#include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/intra_process_manager_impl.hpp" @@ -241,10 +243,14 @@ class IntraProcessManager { using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + constexpr bool serialized = + rclcpp::subscription_traits::is_serialized_subscription_argument::value; uint64_t message_seq = 0; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id( - intra_process_publisher_id, message_seq); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + intra_process_publisher_id, + message_seq, + serialized); + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); if (!typed_buffer) { throw std::runtime_error("Typecast failed due to incorrect message type"); } @@ -254,7 +260,7 @@ class IntraProcessManager // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - impl_->store_intra_process_message(intra_process_publisher_id, message_seq); + impl_->store_intra_process_message(intra_process_publisher_id, message_seq, serialized); // Return the message sequence which is sent to the subscription. return message_seq; @@ -262,7 +268,10 @@ class IntraProcessManager template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::conditional< + rclcpp::subscription_traits::is_serialized_subscription_argument::value, + mapped_ring_buffer::deallocate_rmw_serialized_message, + std::default_delete>> uint64_t store_intra_process_message( uint64_t intra_process_publisher_id, @@ -270,10 +279,14 @@ class IntraProcessManager { using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + constexpr bool serialized = + rclcpp::subscription_traits::is_serialized_subscription_argument::value; uint64_t message_seq = 0; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id( - intra_process_publisher_id, message_seq); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + intra_process_publisher_id, + message_seq, + serialized); + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); if (!typed_buffer) { throw std::runtime_error("Typecast failed due to incorrect message type"); } @@ -283,12 +296,150 @@ class IntraProcessManager // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - impl_->store_intra_process_message(intra_process_publisher_id, message_seq); + impl_->store_intra_process_message(intra_process_publisher_id, message_seq, serialized); // Return the message sequence which is sent to the subscription. return message_seq; } + /// Helper to deserialize message from rmw_serialized_message_t to some message type + template + struct Deserializer + { + using ElemAllocTraits = allocator::AllocRebind; + using ElemAlloc = typename ElemAllocTraits::allocator_type; + + MessageT * deserialize(const rmw_serialized_message_t * serialized_msg) + { + if (!serialized_msg || + serialized_msg->buffer_capacity == 0 || + serialized_msg->buffer_length == 0 || + !serialized_msg->buffer) + { + throw std::runtime_error("Failed to deserialize nullptr serialized message."); + } + + ElemAlloc allocator; + auto ptr = ElemAllocTraits::allocate(allocator, 1); + + const auto msg_ts = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + const auto ret = rmw_deserialize(serialized_msg, msg_ts, ptr); + if (ret != RMW_RET_OK) { + throw std::runtime_error("Failed to deserialize serialized message."); + } + + return ptr; + } + }; + + /// Helper to deserialize message from rmw_serialized_message_t when it should be kept serialized + template + struct Deserializer + { + rmw_serialized_message_t * deserialize(const rmw_serialized_message_t * serialized_msg) + { + if (!serialized_msg || + serialized_msg->buffer_capacity == 0 || + serialized_msg->buffer_length == 0 || + serialized_msg->buffer == nullptr) + { + throw std::runtime_error("Failed to deserialize nullptr serialized message."); + } + + return new rmw_serialized_message_t(*serialized_msg); + } + }; + + /// Serialize a message if needed. In this case just point to the content (same type). + template + struct Serializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + (void)type_support; + message = std::move(data); + } + }; + + /// Serialize a message if needed. rmw_serialized_message_t to a message type. + template + struct Serializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + message.reset(new rmw_serialized_message_t(serialize(data, type_support))); + } + + static rmw_serialized_message_t serialize( + const MessageTIn & msg, + const rosidl_message_type_support_t * type_support) + { + auto rcutils_allocator_ = rcutils_get_default_allocator(); + + auto serialized_message = rmw_get_zero_initialized_serialized_message(); + const auto ret = rmw_serialized_message_init(&serialized_message, 0, &rcutils_allocator_); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error( + "Error allocating resources for serialized message: " + + std::string(rcutils_get_error_string().str)); + } + + if (!msg) { + return serialized_message; + } + + auto error = rmw_serialize( + msg.get(), + type_support, + &serialized_message); + if (error != RCL_RET_OK) { + throw std::runtime_error("Failed to serialize."); + } + + return serialized_message; + } + }; + + /// Not serializing a message if same type. + template + struct TypedSerializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + (void)type_support; + (void)message; + (void)data; + throw std::runtime_error("Serialization case not supported!"); + } + }; + + /// Serialize a message if needed. rmw_serialized_message_t to a message type. + template + struct TypedSerializer + { + void serialize( + MessageTIn & data, + MessageTOutPtr & message, + const rosidl_message_type_support_t * type_support) + { + Serializer s; + message.reset(new rmw_serialized_message_t(s.serialize(data, type_support))); + } + }; + /// Take an intra process message. /** * The intra_process_publisher_id and message_sequence_number parameters @@ -326,7 +477,10 @@ class IntraProcessManager */ template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::conditional< + rclcpp::subscription_traits::is_serialized_subscription_argument::value, + mapped_ring_buffer::deallocate_rmw_serialized_message, + std::default_delete>> void take_intra_process_message( uint64_t intra_process_publisher_id, @@ -334,8 +488,6 @@ class IntraProcessManager uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr & message) { - using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; - using TypedMRB = mapped_ring_buffer::MappedRingBuffer; message = nullptr; size_t target_subs_size = 0; @@ -346,17 +498,82 @@ class IntraProcessManager requesting_subscriptions_intra_process_id, target_subs_size ); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); - if (!typed_buffer) { + + if (!buffer) { return; } - // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. - if (target_subs_size) { - // There are more subscriptions to serve, return a copy. - typed_buffer->get(message_sequence_number, message); + + if (buffer->is_serialized()) { + using MRBMessageAlloc = typename std::allocator_traits + >::template rebind_alloc; + using TypedMRB = + mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + typename TypedMRB::ElemUniquePtr serialized_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, serialized_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, serialized_msg); + } + + Deserializer deserializer; + message.reset(deserializer.deserialize(serialized_msg.get())); + if (rclcpp::subscription_traits::is_serialized_subscription_argument::value) { + serialized_msg.release(); + } + } else if (rclcpp::subscription_traits::is_serialized_subscription_argument::value) { + mapped_ring_buffer::MappedRingBufferBase::ConstVoidSharedPtr stored_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + buffer->get(message_sequence_number, stored_msg); + } else { + // This is the last one to be returned, transfer ownership. + buffer->pop(message_sequence_number, stored_msg); + } + if (!stored_msg) { + return; + } + + TypedSerializer< + mapped_ring_buffer::MappedRingBufferBase::ConstVoidSharedPtr, + std::unique_ptr> serializer; + serializer.serialize(stored_msg, message, buffer->get_type_support()); } else { - // This is the last one to be returned, transfer ownership. - typed_buffer->pop(message_sequence_number, message); + using MRBMessageAlloc = + typename std::allocator_traits::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + typename TypedMRB::ElemUniquePtr stored_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, stored_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, stored_msg); + } + if (!stored_msg) { + return; + } + + Serializer> + serializer; + serializer.serialize(stored_msg, message, typed_buffer->get_type_support()); } } @@ -369,29 +586,71 @@ class IntraProcessManager uint64_t requesting_subscriptions_intra_process_id, std::shared_ptr & message) { - using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; - using TypedMRB = mapped_ring_buffer::MappedRingBuffer; message = nullptr; size_t target_subs_size = 0; std::lock_guard lock(take_mutex_); - mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message( + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = + impl_->take_intra_process_message( intra_process_publisher_id, message_sequence_number, requesting_subscriptions_intra_process_id, target_subs_size - ); - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); - if (!typed_buffer) { + ); + + if (!buffer) { return; } - // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. - if (target_subs_size) { - // There are more subscriptions to serve, return a copy. - typed_buffer->get(message_sequence_number, message); + + if (buffer->is_serialized()) { + using MRBMessageAlloc = typename std::allocator_traits + >::template rebind_alloc; + using TypedMRB = + mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + std::shared_ptr serialized_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, serialized_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, serialized_msg); + } + + Deserializer deserializer; + message.reset(deserializer.deserialize(serialized_msg.get())); } else { - // This is the last one to be returned, transfer ownership. - typed_buffer->pop(message_sequence_number, message); + using MRBMessageAlloc = typename std::allocator_traits + ::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; + typename TypedMRB::SharedPtr typed_buffer = std::dynamic_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + + std::shared_ptr stored_msg; + + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, stored_msg); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, stored_msg); + } + if (!stored_msg) { + return; + } + + Serializer, std::shared_ptr, MessageT> + serializer; + serializer.serialize(stored_msg, message, typed_buffer->get_type_support()); } } diff --git a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp index ab29af7b92..c4759d5a9c 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp @@ -69,10 +69,14 @@ class IntraProcessManagerImplBase virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id( uint64_t intra_process_publisher_id, - uint64_t & message_seq) = 0; + uint64_t & message_seq, + const bool serialized) = 0; virtual void - store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0; + store_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_seq, + const bool serialized) = 0; virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message( @@ -116,7 +120,7 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase // remove references to this subscription's id. for (auto & publisher_pair : publishers_) { for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { - sub_pair.second.erase(intra_process_subscription_id); + sub_pair.second.first.erase(intra_process_subscription_id); } } } @@ -135,6 +139,7 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase publishers_[id].sequence_number.store(0); publishers_[id].buffer = mrb; + publishers_[id].buffer_serialized = mrb->make_serialized_mapped_ring_buffer(); publishers_[id].target_subscriptions_by_message_sequence.reserve(size); } @@ -148,7 +153,8 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id( uint64_t intra_process_publisher_id, - uint64_t & message_seq) + uint64_t & message_seq, + const bool serialized) { std::lock_guard lock(runtime_mutex_); auto it = publishers_.find(intra_process_publisher_id); @@ -159,11 +165,17 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase // Calculate the next message sequence number. message_seq = info.sequence_number.fetch_add(1); + if (serialized) { + return info.buffer_serialized; + } return info.buffer; } void - store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) + store_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_seq, + const bool serialized) { std::lock_guard lock(runtime_mutex_); auto it = publishers_.find(intra_process_publisher_id); @@ -182,17 +194,19 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase // Store the list for later comparison. if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) { info.target_subscriptions_by_message_sequence.emplace( - message_seq, AllocSet(std::less(), uint64_allocator)); + message_seq, + std::make_pair(AllocSet(std::less(), uint64_allocator), + serialized)); } else { - info.target_subscriptions_by_message_sequence[message_seq].clear(); + info.target_subscriptions_by_message_sequence[message_seq].first.clear(); } std::copy( destined_subscriptions.begin(), destined_subscriptions.end(), // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] std::inserter( - info.target_subscriptions_by_message_sequence[message_seq], + info.target_subscriptions_by_message_sequence[message_seq].first, // This ends up only being a hint to std::set, could also be .begin(). - info.target_subscriptions_by_message_sequence[message_seq].end() + info.target_subscriptions_by_message_sequence[message_seq].first.end() ) ); } @@ -217,13 +231,15 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase } // Figure out how many subscriptions are left. AllocSet * target_subs; + bool serialized = false; { auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); if (it == info->target_subscriptions_by_message_sequence.end()) { // Message is no longer being stored by this publisher. return 0; } - target_subs = &it->second; + target_subs = &it->second.first; + serialized = it->second.second; } { auto it = std::find( @@ -236,6 +252,10 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase target_subs->erase(it); } size = target_subs->size(); + // did we store a serialized message? + if (serialized) { + return info->buffer_serialized; + } return info->buffer; } @@ -330,12 +350,14 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase PublisherBase::WeakPtr publisher; std::atomic sequence_number; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer_serialized; using TargetSubscriptionsMap = std::unordered_map< - uint64_t, AllocSet, + uint64_t, std::pair, std::hash, std::equal_to, - RebindAlloc>>; + RebindAlloc>>>; TargetSubscriptionsMap target_subscriptions_by_message_sequence; + TargetSubscriptionsMap target_subscriptions_by_message_sequence_serialized; }; using PublisherMap = std::unordered_map< diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index c8fdf6493d..bea81d78ee 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -21,11 +21,14 @@ #include #include #include +#include #include #include #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/subscription_traits.hpp" +#include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -33,10 +36,64 @@ namespace rclcpp namespace mapped_ring_buffer { +/// Deleter for serialized messages +struct deallocate_rmw_serialized_message +{ + void operator()(rmw_serialized_message_t * msg) const + { + if (msg) { + int error = rmw_serialized_message_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + throw std::runtime_error("Leaking memory. Error: " + + std::string(rcutils_get_error_string().str)); + } + } + } +}; + class RCLCPP_PUBLIC MappedRingBufferBase { public: RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase) + + using ConstVoidSharedPtr = std::shared_ptr; + + /// Returns true if the content is a serialized message. + virtual bool is_serialized() = 0; + + /// Create a mapped ring buffer for serialized messages with same allocator. + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_serialized_mapped_ring_buffer() const = 0; + + /// Returns type support of message type. For serialized case return nullptr. + virtual const rosidl_message_type_support_t * get_type_support() const = 0; + + /// Give the ownership of the stored value to the caller, at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + virtual void pop(uint64_t key, ConstVoidSharedPtr & value) = 0; + + /// Share ownership of the value stored in the ring buffer at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + virtual void get(uint64_t key, ConstVoidSharedPtr & value) = 0; }; /// Ring buffer container of shared_ptr's or unique_ptr's of T, which can be accessed by a key. @@ -73,10 +130,13 @@ class MappedRingBuffer : public MappedRingBufferBase * The constructor will allocate memory while reserving space. * * \param size size of the ring buffer; must be positive and non-zero. + * \param type_support message type of content; for serialized messages nullptr * \param allocator optional custom allocator */ - explicit MappedRingBuffer(size_t size, std::shared_ptr allocator = nullptr) - : elements_(size), head_(0) + explicit MappedRingBuffer( + size_t size, const rosidl_message_type_support_t * type_support, + std::shared_ptr allocator = nullptr) + : elements_(size), head_(0), type_support_(type_support) { if (size == 0) { throw std::invalid_argument("size must be a positive, non-zero value"); @@ -129,6 +189,13 @@ class MappedRingBuffer : public MappedRingBufferBase } } + /// Check if content is serialized. + bool + is_serialized() override + { + return type_support_ == nullptr; + } + /// Share ownership of the value stored in the ring buffer at the given key. /** * The key is matched if an element in the ring buffer has a matching key. @@ -275,6 +342,40 @@ class MappedRingBuffer : public MappedRingBufferBase return did_replace; } + /// Give the ownership of the stored value to the caller, at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + void pop(uint64_t key, ConstVoidSharedPtr & value) override + { + pop(key, reinterpret_cast(value)); + } + + + /// Share ownership of the value stored in the ring buffer at the given key. + /** + * The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + void get(uint64_t key, ConstVoidSharedPtr & value) override + { + get(key, reinterpret_cast(value)); + } + + /// Return true if the key is found in the ring buffer, otherwise false. bool has_key(uint64_t key) @@ -283,6 +384,23 @@ class MappedRingBuffer : public MappedRingBufferBase return elements_.end() != get_iterator_of_key(key); } + /// get type support for stored messages + const rosidl_message_type_support_t * + get_type_support() const override + { + return type_support_; + } + + /// create a corresponding ring buffer for serialized messages of type rmw_serialized_message_t + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_serialized_mapped_ring_buffer() const override + { + return mapped_ring_buffer::MappedRingBuffer< + rmw_serialized_message_t, + typename std::allocator_traits::template rebind_alloc + >::make_shared(elements_.size(), nullptr); + } + private: RCLCPP_DISABLE_COPY(MappedRingBuffer) @@ -311,6 +429,7 @@ class MappedRingBuffer : public MappedRingBufferBase size_t head_; std::shared_ptr allocator_; std::mutex data_mutex_; + const rosidl_message_type_support_t * type_support_; }; } // namespace mapped_ring_buffer diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index a0bd342cc9..582fa226e9 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -46,6 +46,7 @@ template> class Publisher : public PublisherBase { public: + using PublisherBase::publish; using MessageAllocTraits = allocator::AllocRebind; using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; @@ -88,7 +89,10 @@ class Publisher : public PublisherBase return mapped_ring_buffer::MappedRingBuffer< MessageT, typename Publisher::MessageAlloc - >::make_shared(size, this->get_allocator()); + >::make_shared( + size, + rosidl_typesupport_cpp::get_message_type_support_handle(), + this->get_allocator()); } /// Send a message to the topic for this publisher. @@ -170,23 +174,6 @@ class Publisher : public PublisherBase return this->publish(*msg); } - void - publish(const rcl_serialized_message_t & serialized_msg) - { - return this->do_serialized_publish(&serialized_msg); - } - -// Skip deprecated attribute in windows, as it raise a warning in template specialization. -#if !defined(_WIN32) - [[deprecated( - "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]] -#endif - void - publish(const rcl_serialized_message_t * serialized_msg) - { - return this->do_serialized_publish(serialized_msg); - } - // Skip deprecated attribute in windows, as it raise a warning in template specialization. #if !defined(_WIN32) [[deprecated( @@ -223,41 +210,6 @@ class Publisher : public PublisherBase } } - void - do_serialized_publish(const rcl_serialized_message_t * serialized_msg) - { - if (intra_process_is_enabled_) { - // TODO(Karsten1987): support serialized message passed by intraprocess - throw std::runtime_error("storing serialized messages in intra process is not supported yet"); - } - auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); - } - } - - void - do_intra_process_publish(uint64_t message_seq) - { - rcl_interfaces::msg::IntraProcessMessage ipm; - ipm.publisher_id = intra_process_publisher_id_; - ipm.message_sequence = message_seq; - auto status = rcl_publish(&intra_process_publisher_handle_, &ipm, nullptr); - if (RCL_RET_PUBLISHER_INVALID == status) { - rcl_reset_error(); // next call will reset error message if not context - if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) { - rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_); - if (nullptr != context && !rcl_context_is_valid(context)) { - // publisher is invalid due to context being shutdown - return; - } - } - } - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); - } - } - uint64_t store_intra_process_message( uint64_t publisher_id, diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index afb68302f8..e13c263d4d 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -195,7 +195,24 @@ class PublisherBase IntraProcessManagerSharedPtr ipm, const rcl_publisher_options_t & intra_process_options); + void + publish(const rcl_serialized_message_t & serialized_msg); + +// Skip deprecated attribute in windows, as it raise a warning in template specialization. +#if !defined(_WIN32) + [[deprecated( + "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]] +#endif + void + publish(const rcl_serialized_message_t * serialized_msg); + protected: + void + do_serialized_publish(const rcl_serialized_message_t * serialized_msg); + + void + do_intra_process_publish(uint64_t message_seq); + template void add_event_handler( diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index a2c6ceac91..56ba8d50ea 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -183,7 +183,7 @@ PublisherBase::get_subscription_count() const } } if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count"); + rclcpp::exceptions::throw_from_rcl_error(status, "Failed to get get subscription count."); } return inter_process_subscription_count; } @@ -199,8 +199,8 @@ PublisherBase::get_intra_process_subscription_count() const // TODO(ivanpauno): should this just return silently? Or maybe return with a warning? // Same as wjwwood comment in publisher_factory create_shared_publish_callback. throw std::runtime_error( - "intra process subscriber count called after " - "destruction of intra process manager"); + "Intra process subscriber count called after " + "destruction of intra process manager."); } return ipm->get_subscription_count(intra_process_publisher_id_); } @@ -210,7 +210,7 @@ PublisherBase::get_actual_qos() const { const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(&publisher_handle_); if (!qos) { - auto msg = std::string("failed to get qos settings: ") + rcl_get_error_string().str; + auto msg = std::string("Failed to get qos settings: ") + rcl_get_error_string().str; rcl_reset_error(); throw std::runtime_error(msg); } @@ -235,14 +235,14 @@ PublisherBase::operator==(const rmw_gid_t * gid) const bool result = false; auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result); if (ret != RMW_RET_OK) { - auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str; + auto msg = std::string("Failed to compare gids: ") + rmw_get_error_string().str; rmw_reset_error(); throw std::runtime_error(msg); } if (!result) { ret = rmw_compare_gids_equal(gid, &this->get_intra_process_gid(), &result); if (ret != RMW_RET_OK) { - auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str; + auto msg = std::string("Failed to compare gids: ") + rmw_get_error_string().str; rmw_reset_error(); throw std::runtime_error(msg); } @@ -266,7 +266,7 @@ PublisherBase::setup_intra_process( // Intraprocess configuration is not allowed with "durability" qos policy non "volatile". if (this->get_actual_qos().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) { throw std::invalid_argument( - "intraprocess communication is not allowed with durability qos policy non-volatile"); + "Intraprocess communication is not allowed with durability qos policy non-volatile"); } const char * topic_name = this->get_topic_name(); if (!topic_name) { @@ -292,7 +292,7 @@ PublisherBase::setup_intra_process( rcl_node_get_namespace(rcl_node_handle)); } - rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process publisher"); + rclcpp::exceptions::throw_from_rcl_error(ret, "Could not create intra process publisher"); } intra_process_publisher_id_ = intra_process_publisher_id; @@ -311,8 +311,77 @@ PublisherBase::setup_intra_process( publisher_rmw_handle, &intra_process_rmw_gid_); if (rmw_ret != RMW_RET_OK) { auto msg = - std::string("failed to create intra process publisher gid: ") + rmw_get_error_string().str; + std::string("Failed to create intra process publisher gid: ") + rmw_get_error_string().str; rmw_reset_error(); throw std::runtime_error(msg); } } + +void +PublisherBase::publish(const rcl_serialized_message_t & serialized_msg) +{ + return this->do_serialized_publish(&serialized_msg); +} + +// Skip deprecated attribute in windows, as it raise a warning in template specialization. +#if !defined(_WIN32) +[[deprecated( + "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]] +#endif +void +PublisherBase::publish(const rcl_serialized_message_t * serialized_msg) +{ + return this->do_serialized_publish(serialized_msg); +} + +void +PublisherBase::do_serialized_publish(const rcl_serialized_message_t * serialized_msg) +{ + if (!serialized_msg) { + throw std::runtime_error("Cannot publisher msg which is a null pointer."); + } + + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (intra_process_is_enabled_) { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "Intra process publish called after destruction of intra process manager."); + } + const uint64_t message_seq = + ipm->template store_intra_process_message( + intra_process_publisher_id_, std::make_unique(*serialized_msg)); + this->do_intra_process_publish(message_seq); + } + + if (inter_process_publish_needed) { + auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "Failed to publish serialized message."); + } + } +} + +void +PublisherBase::do_intra_process_publish(uint64_t message_seq) +{ + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + auto status = rcl_publish(&intra_process_publisher_handle_, &ipm, nullptr); + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); // next call will reset error message if not context + if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) { + rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_); + if (nullptr != context && !rcl_context_is_valid(context)) { + // publisher is invalid due to context being shutdown + return; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "Failed to publish intra process message."); + } +} diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp new file mode 100644 index 0000000000..059291f9ae --- /dev/null +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -0,0 +1,283 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + + +int32_t & get_test_allocation_counter() +{ + static int32_t counter = 0; + return counter; +} + +std::shared_ptr make_serialized_string_msg( + const std::shared_ptr & stringMsg) +{ + auto m_allocator = rcutils_get_default_allocator(); + size_t message_size = 80u + static_cast(sizeof(rcl_interfaces::msg::IntraProcessMessage)); + + auto msg = new rcutils_uint8_array_t; + *msg = rcutils_get_zero_initialized_uint8_array(); + auto ret = rcutils_uint8_array_init(msg, message_size, &m_allocator); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error("Error allocating resources " + std::to_string(ret)); + } + + ++get_test_allocation_counter(); + auto serialized_data = std::shared_ptr( + msg, + [](rcutils_uint8_array_t * msg) { + --get_test_allocation_counter(); + int error = rcutils_uint8_array_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "test_intra_process_communication", + "Leaking memory %i", + error); + } + }); + + serialized_data->buffer_length = message_size; + + static auto type = + rosidl_typesupport_cpp::get_message_type_support_handle + (); + auto error = rmw_serialize(stringMsg.get(), type, serialized_data.get()); + if (error != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("test_intra_process_communication", + "Something went wrong preparing the serialized message"); + } + + return serialized_data; +} + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expect intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + +protected: + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( + 2000); +std::array counts; + +void OnMessageSerialized(const std::shared_ptr msg) +{ + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageSerializedU(std::unique_ptr msg) +{ + EXPECT_NE(msg, nullptr); + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageConst(std::shared_ptr msg) +{ + EXPECT_EQ(msg->message_sequence, 1234u); + + ++counts[1]; +} + +void OnMessageU(std::unique_ptr msg) +{ + EXPECT_EQ(msg->message_sequence, 1234u); + + ++counts[1]; +} + +void OnMessage(std::shared_ptr msg) +{ + EXPECT_EQ(msg->message_sequence, 1234u); + + ++counts[1]; +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +{ + TestParameters parameters = GetParam(); + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic", 10); + + auto mem_strategy = + rclcpp::message_memory_strategy::MessageMemoryStrategy + ::create_default(); + + auto stringMsg = std::make_shared(); + stringMsg->message_sequence = 1234u; + + { + auto msg0 = make_serialized_string_msg(stringMsg); + + auto sub = node->create_subscription("/topic", 10, + &OnMessage); + auto sub1 = node->create_subscription("/topic", 10, + &OnMessageU); + auto sub2 = node->create_subscription("/topic", 10, + &OnMessageConst); + auto sub_ser = node->create_subscription("/topic", 10, + &OnMessageSerialized); + auto sub_ser2 = node->create_subscription("/topic", + 10, + &OnMessageSerialized); + auto sub_ser3 = node->create_subscription("/topic", + 10, + &OnMessageSerializedU); + + rclcpp::sleep_for(offset); + + counts.fill(0); + + std::unique_ptr stringMsgU( + new rcl_interfaces::msg::IntraProcessMessage( + *stringMsg)); + std::unique_ptr msg0U(new rcutils_uint8_array_t(*msg0)); + + // Now deprecated functions. +#if !defined(_WIN32) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#else // !defined(_WIN32) +# pragma warning(push) +# pragma warning(disable: 4996) +#endif + publisher->publish(*msg0); + publisher->publish(stringMsg); + publisher->publish(*msg0U); + publisher->publish(std::move(stringMsgU)); +#if !defined(_WIN32) +# pragma GCC diagnostic pop +#else // !defined(_WIN32) +# pragma warning(pop) +#endif + + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + + EXPECT_EQ(counts[0], 12u); + EXPECT_EQ(counts[1], 12u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +TestParameters parameters[] = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index fe3c535107..e2964c5a86 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -91,7 +91,10 @@ class Publisher : public PublisherBase return mapped_ring_buffer::MappedRingBuffer< T, typename Publisher::MessageAlloc - >::make_shared(size, allocator_); + >::make_shared( + size, + rosidl_typesupport_cpp::get_message_type_support_handle(), + allocator_); } std::shared_ptr get_allocator() diff --git a/rclcpp/test/test_mapped_ring_buffer.cpp b/rclcpp/test/test_mapped_ring_buffer.cpp index 5625804d13..e87baa4a11 100644 --- a/rclcpp/test/test_mapped_ring_buffer.cpp +++ b/rclcpp/test/test_mapped_ring_buffer.cpp @@ -25,9 +25,10 @@ */ TEST(TestMappedRingBuffer, empty) { // Cannot create a buffer of size zero. - EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(0), std::invalid_argument); + EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer + mrb(0, nullptr), std::invalid_argument); // Getting or popping an empty buffer should result in a nullptr. - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(1); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(1, nullptr); std::unique_ptr unique; mrb.get(1, unique); @@ -49,7 +50,7 @@ TEST(TestMappedRingBuffer, empty) { get and pop methods with shared_ptr signature. */ TEST(TestMappedRingBuffer, temporary_l_value_with_shared_get_pop) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); // Pass in value with temporary object mrb.push_and_replace(1, std::shared_ptr(new char('a'))); @@ -69,7 +70,7 @@ TEST(TestMappedRingBuffer, temporary_l_value_with_shared_get_pop) { get and pop methods with unique_ptr signature. */ TEST(TestMappedRingBuffer, temporary_l_value_with_unique_get_pop) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); // Pass in value with temporary object mrb.push_and_replace(1, std::shared_ptr(new char('a'))); @@ -89,7 +90,7 @@ TEST(TestMappedRingBuffer, temporary_l_value_with_unique_get_pop) { Using shared push_and_replace, get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_shared) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::shared_ptr expected(new char('a')); EXPECT_FALSE(mrb.push_and_replace(1, expected)); @@ -145,7 +146,7 @@ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_shared) { Using shared push_and_replace, unique get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_unique) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::shared_ptr expected(new char('a')); const char * expected_orig = expected.get(); @@ -207,7 +208,7 @@ TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_unique) { Using unique push_and_replace, get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_unique) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::unique_ptr expected(new char('a')); const char * expected_orig = expected.get(); @@ -258,7 +259,7 @@ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_unique) { Using unique push_and_replace, shared get and pop methods. */ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_shared) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::unique_ptr expected(new char('a')); const char * expected_orig = expected.get(); @@ -308,7 +309,7 @@ TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_shared) { Tests the affect of reusing keys (non-unique keys) in a mrb. */ TEST(TestMappedRingBuffer, non_unique_keys) { - rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2, nullptr); std::shared_ptr input(new char('a')); mrb.push_and_replace(1, input);