diff --git a/source/LibMultiSense/details/legacy/channel.cc b/source/LibMultiSense/details/legacy/channel.cc index 5afc2435..d772c60b 100644 --- a/source/LibMultiSense/details/legacy/channel.cc +++ b/source/LibMultiSense/details/legacy/channel.cc @@ -727,7 +727,8 @@ std::optional LegacyChannel::get_system_status() const auto message_stats = m_message_assembler.get_message_statistics(); MultiSenseStatus::ClientNetworkStatus client_stats{message_stats.received_messages, message_stats.dropped_messages, - message_stats.invalid_packets}; + message_stats.invalid_packets, + (m_udp_receiver ? m_udp_receiver->dropped_packets() : 0)}; return MultiSenseStatus{system_ok(status->message), (ptp_status ? std::make_optional(convert(ptp_status.value())) : std::nullopt), diff --git a/source/LibMultiSense/details/legacy/storage.cc b/source/LibMultiSense/details/legacy/storage.cc index 0b06ba80..73f11902 100644 --- a/source/LibMultiSense/details/legacy/storage.cc +++ b/source/LibMultiSense/details/legacy/storage.cc @@ -44,90 +44,116 @@ namespace multisense{ namespace legacy{ namespace { - constexpr size_t NUM_ALLOCATION_RETRIES = 5; -} -BufferPool::BufferPool(const BufferPoolConfig &config): - m_config(config) +constexpr size_t NUM_ALLOCATION_RETRIES = 5; + +std::pair< + std::vector>, + std::vector> +allocate_buffers( size_t count, size_t buffer_size) { + std::vector> storage{}; + std::vector free_list{}; - for (size_t i = 0 ; i < config.num_small_buffers ; ++i) + storage.reserve(count); + for (size_t i = 0 ; i < count ; ++i) { - for (size_t t = 0; t < NUM_ALLOCATION_RETRIES ; ++t) + bool allocated = false; + for (size_t attempt = 0; attempt < NUM_ALLOCATION_RETRIES; ++attempt) { try { - auto small_buffer = std::make_shared>(); - small_buffer->reserve(config.small_buffer_size); - m_small_buffers.emplace_back(std::move(small_buffer)); + storage.emplace_back(buffer_size, 0); + allocated = true; break; } catch(const std::exception &e) { - (void) e; - CRL_DEBUG("Failed to allocate small buffer. Retrying\n"); + CRL_DEBUG("Failed to allocate buffers: %s. Retrying\n", e.what()); } } - } - for (size_t i = 0 ; i < config.num_large_buffers ; ++i) - { - for (size_t t = 0; t < NUM_ALLOCATION_RETRIES ; ++t) + if (!allocated) { - try - { - auto large_buffer = std::make_shared>(); - large_buffer->reserve(config.large_buffer_size); - m_large_buffers.emplace_back(std::move(large_buffer)); - break; - } - catch(const std::exception &e) - { - (void) e; - CRL_DEBUG("Failed to allocate large buffer\n"); - } + CRL_EXCEPTION("Failed to allocate buffers"); } } - if (m_small_buffers.size() != config.num_small_buffers || m_large_buffers.size() != config.num_large_buffers) + free_list.reserve(count); + for (size_t i = 0 ; i < count ; ++i) { - CRL_EXCEPTION("Failed to allocate buffers"); + free_list.emplace_back(i); } + + return std::make_pair(std::move(storage), std::move(free_list)); +} + +} + +BufferPool::BufferPool(const BufferPoolConfig &config): + m_config(config) +{ + std::tie(m_small_buffers, m_small_free_list) = allocate_buffers(config.num_small_buffers, config.small_buffer_size); + std::tie(m_large_buffers, m_large_free_list) = allocate_buffers(config.num_large_buffers, config.large_buffer_size); } std::shared_ptr> BufferPool::get_buffer(size_t target_size) { if (target_size <= m_config.small_buffer_size) { - const auto &small_buffer = std::find_if(std::begin(m_small_buffers), std::end(m_small_buffers), - [](const auto &small_buffer) - { - return small_buffer.use_count() == 1; - }); - - if (small_buffer != std::end(m_small_buffers)) - { - (*small_buffer)->resize(target_size, 0); - return *small_buffer; - } + return acquire_buffer(BufferType::Small, target_size); } else if (target_size <= m_config.large_buffer_size) { - const auto &large_buffer = std::find_if(std::begin(m_large_buffers), std::end(m_large_buffers), - [](const auto &large_buffer) - { - return large_buffer.use_count() == 1; - }); - - if (large_buffer != std::end(m_large_buffers)) - { - (*large_buffer)->resize(target_size, 0); - return *large_buffer; - } + return acquire_buffer(BufferType::Large, target_size); } return nullptr; } +std::shared_ptr> BufferPool::acquire_buffer(BufferType type, size_t target_size) +{ + std::unique_lock lock(m_mutex); + + auto& free_list = (type == BufferType::Small) ? m_small_free_list : m_large_free_list; + auto& storage = (type == BufferType::Small) ? m_small_buffers : m_large_buffers; + + if (free_list.empty()) + { + return nullptr; + } + + const size_t index = free_list.back(); + free_list.pop_back(); + auto* buffer = &storage[index]; + lock.unlock(); + + buffer->resize(target_size, 0); + + const auto self = shared_from_this(); + + return std::shared_ptr>(buffer, + [self, this, type, index](std::vector* released_buffer) + { + this->release_buffer(type, index, released_buffer); + }); +} + +void BufferPool::release_buffer(BufferType type, size_t index, std::vector* buffer) +{ + const size_t reserve_size = (type == BufferType::Small) ? + m_config.small_buffer_size : + m_config.large_buffer_size; + + buffer->clear(); + buffer->resize(0); + buffer->reserve(reserve_size); + + std::lock_guard lock(m_mutex); + + auto& free_list = (type == BufferType::Small) ? m_small_free_list : m_large_free_list; + free_list.emplace_back(index); +} + } } diff --git a/source/LibMultiSense/details/legacy/udp.cc b/source/LibMultiSense/details/legacy/udp.cc index eb5cb5c3..774f5a93 100644 --- a/source/LibMultiSense/details/legacy/udp.cc +++ b/source/LibMultiSense/details/legacy/udp.cc @@ -34,6 +34,7 @@ * 2024-12-24, malvarado@carnegierobotics.com, IRAD, Created file. **/ +#include #include #include @@ -46,25 +47,36 @@ namespace legacy{ UdpReceiver::UdpReceiver(const NetworkSocket &socket, size_t max_mtu, - std::function&)> receive_callback): + std::function&)> receive_callback, + size_t max_packet_queue_depth): m_socket(socket.sensor_socket), m_stop(false), m_max_mtu(max_mtu), - m_incoming_buffer(max_mtu, 0), + m_packet_buffers(max_packet_queue_depth, std::vector(max_mtu, 0)), m_receive_callback(receive_callback) - { + for (size_t i = 0; i < m_packet_buffers.size(); ++i) + { + m_free_buffers.emplace_back(i); + } + m_rx_thread = std::thread(&UdpReceiver::rx_thread, this); + m_dispatch_thread = std::thread(&UdpReceiver::dispatch_thread, this); } UdpReceiver::~UdpReceiver() { - if(!m_stop.exchange(true)) + m_stop.store(true); + m_queue_valid.notify_all(); + + if (m_rx_thread.joinable()) { - if (m_rx_thread.joinable()) - { - m_rx_thread.join(); - } + m_rx_thread.join(); + } + + if (m_dispatch_thread.joinable()) + { + m_dispatch_thread.join(); } } @@ -93,6 +105,18 @@ void UdpReceiver::rx_thread() // while(true) { + size_t buffer_index = 0; + { + std::lock_guard lock(m_queue_mutex); + if (m_free_buffers.empty()) + { + ++m_dropped_packets; + continue; + } + + buffer_index = m_free_buffers.front(); + m_free_buffers.pop_front(); + } try { @@ -100,9 +124,11 @@ void UdpReceiver::rx_thread() #pragma warning (push) #pragma warning (disable : 4267) #endif + auto& buffer = m_packet_buffers[buffer_index]; + buffer.resize(m_max_mtu); const int bytes_read = recvfrom(m_socket, - reinterpret_cast(m_incoming_buffer.data()), - m_incoming_buffer.size(), + reinterpret_cast(buffer.data()), + buffer.size(), 0, NULL, NULL); #if defined(WIN32) && !defined(__MINGW64__) #pragma warning (pop) @@ -112,28 +138,75 @@ void UdpReceiver::rx_thread() // if (bytes_read < 0) { + std::lock_guard lock(m_queue_mutex); + m_free_buffers.emplace_back(buffer_index); break; } - m_incoming_buffer.resize(bytes_read); - m_receive_callback(m_incoming_buffer); - m_incoming_buffer.resize(m_max_mtu); + buffer.resize(static_cast(bytes_read)); + + { + std::lock_guard lock(m_queue_mutex); + m_ready_buffers.emplace_back(buffer_index); + } + + m_queue_valid.notify_one(); } catch (const std::exception& e) { - CRL_DEBUG("exception while decoding packet: %s\n", e.what()); - } catch ( ... ) { - CRL_DEBUG_RAW("unknown exception while decoding packet\n"); } } } } +void UdpReceiver::dispatch_thread() +{ + while(true) + { + size_t buffer_index = 0; + { + std::unique_lock lock(m_queue_mutex); + m_queue_valid.wait(lock, [this]() + { + return m_stop || !m_ready_buffers.empty(); + }); + + if (m_stop && m_ready_buffers.empty()) + { + break; + } + + buffer_index = m_ready_buffers.front(); + m_ready_buffers.pop_front(); + } + + try + { + auto& packet = m_packet_buffers[buffer_index]; + m_receive_callback(packet); + } + catch (const std::exception& e) + { + CRL_DEBUG("exception while decoding packet: %s\n", e.what()); + } + catch ( ... ) + { + CRL_DEBUG_RAW("unknown exception while decoding packet\n"); + } + + { + std::lock_guard lock(m_queue_mutex); + m_packet_buffers[buffer_index].resize(m_max_mtu); + m_free_buffers.emplace_back(buffer_index); + } + } +} + int64_t publish_data(const NetworkSocket &socket, const std::vector &data) { // disable MSVC warning for narrowing conversion. diff --git a/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh b/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh index ffe10cbd..ad099303 100644 --- a/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh +++ b/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh @@ -1217,6 +1217,12 @@ struct MultiSenseStatus /// @brief The total number of invalid packets received on the client side /// size_t invalid_packets = 0; + + /// + /// @brief The total number of packets we received over the wire, but were unable to process due to + /// buffer limits + /// + size_t unprocessed_packets = 0; }; struct TimeStatus diff --git a/source/LibMultiSense/include/details/legacy/storage.hh b/source/LibMultiSense/include/details/legacy/storage.hh index b5b9f615..deed5f78 100644 --- a/source/LibMultiSense/include/details/legacy/storage.hh +++ b/source/LibMultiSense/include/details/legacy/storage.hh @@ -55,7 +55,7 @@ struct BufferPoolConfig /// @brief Object to handle the management and delivery of buffers to used to store incoming data without /// needing to continually reallocate internal memory. This class is threadsafe /// -class BufferPool +class BufferPool : public std::enable_shared_from_this { public: @@ -69,12 +69,6 @@ public: BufferPool(const BufferPool&) = delete; BufferPool& operator=(const BufferPool&) = delete; - /// - /// Movable - /// - BufferPool(BufferPool&&) noexcept = default; - BufferPool& operator=(BufferPool&&) noexcept = default; - /// /// @brief Get a buffer which will contain at least target_size bytes of storage /// @@ -90,20 +84,51 @@ public: private: + enum class BufferType + { + Small, + Large + }; + + /// + /// @brief Acquire a buffer from the pool + /// + std::shared_ptr> acquire_buffer(BufferType type, size_t target_size); + + /// + /// @brief Release a buffer back to the bool + /// + void release_buffer(BufferType type, size_t index, std::vector* buffer); + /// /// @brief The configured numbers and sizes of our internal buffers /// BufferPoolConfig m_config; /// - /// @brief The collection of small buffers + /// @brief Handle acquisition of buffers in a thread safe manner + /// + mutable std::mutex m_mutex; + + /// + /// @brief The collection of small buffers which are in use + /// + std::vector> m_small_buffers; + + /// + /// @brief The collection of small buffers which are free to use + /// + std::vector m_small_free_list; + + /// + /// @brief The collection of large buffers which are in use /// - std::vector>> m_small_buffers; + std::vector> m_large_buffers; /// - /// @brief The collection of large buffers + /// @brief The collection of large buffers which are free to use /// - std::vector>> m_large_buffers; + std::vector m_large_free_list; }; diff --git a/source/LibMultiSense/include/details/legacy/udp.hh b/source/LibMultiSense/include/details/legacy/udp.hh index 8b9f410c..3f329d2f 100644 --- a/source/LibMultiSense/include/details/legacy/udp.hh +++ b/source/LibMultiSense/include/details/legacy/udp.hh @@ -37,6 +37,9 @@ #pragma once #include +#include +#include +#include #include #include @@ -72,10 +75,16 @@ public: /// UdpReceiver(const NetworkSocket &socket, size_t max_mtu, - std::function&)> receive_callback); + std::function&)> receive_callback, + size_t max_packet_queue_depth=64); ~UdpReceiver(); + size_t dropped_packets() const + { + return m_dropped_packets; + } + private: /// @@ -83,35 +92,71 @@ private: /// void rx_thread(); + /// + /// @brief The dispatch thread function which is sends UDP data to clients + /// + void dispatch_thread(); + /// /// @brief The internal socket which UDP data is receive on /// socket_t m_socket; /// - /// @brief The rx_thread object which is spawned on construction + /// @brief The rx_thread object which is spawned on construction to receive UDP data /// std::thread m_rx_thread; + /// + /// @brief The dispatch_thread object which is spawned on construction to dispatch data to clients + /// + std::thread m_dispatch_thread; + /// /// @brief Atomic flag to stop the rx_thread on destruction /// std::atomic_bool m_stop{false}; + /// + /// @brief Signal for coordinating when the processing queue is valid + /// + std::condition_variable m_queue_valid; + + /// + /// @brief Mutext for coordinating when data is ready to process + /// + std::mutex m_queue_mutex; + /// /// @brief The amount of data to read off the socket during each read operation /// size_t m_max_mtu = 0; /// - /// @brief Internal buffer used to write incoming UDP data into + /// @brief queue used to send data between the rx and dispatch threads. The queue stores indices + /// into m_packet_buffers to avoid copying the payload + /// + std::vector> m_packet_buffers; + + /// + /// @brief indices of buffers which are free to write to in m_packet_buffers /// - std::vector m_incoming_buffer; + std::deque m_free_buffers; + + /// + /// @brief indices of buffers which are ready to process in m_packet_buffers + /// + std::deque m_ready_buffers; /// /// @brief User specified callback which is called once UDP data is received /// std::function&)> m_receive_callback; + + /// + /// @brief Counter for messages dropped due to dispatch processing slowdowsn + /// + std::atomic_size_t m_dropped_packets = 0; }; diff --git a/source/LibMultiSense/test/storage_test.cc b/source/LibMultiSense/test/storage_test.cc index d5a05b9f..84a79cab 100644 --- a/source/LibMultiSense/test/storage_test.cc +++ b/source/LibMultiSense/test/storage_test.cc @@ -42,17 +42,17 @@ using namespace multisense::legacy; TEST(BufferPool, null_construction) { - BufferPool pool(BufferPoolConfig{0, 0, 0, 0}); + auto pool = std::make_shared(BufferPoolConfig{0, 0, 0, 0}); - ASSERT_EQ(pool.get_buffer(1), nullptr); + ASSERT_EQ(pool->get_buffer(1), nullptr); } TEST(BufferPool, valid_construction) { - BufferPool pool(BufferPoolConfig{1, 10, 1, 30}); + auto pool = std::make_shared(BufferPoolConfig{1, 10, 1, 30}); - const auto small_buffer = pool.get_buffer(2); - const auto large_buffer = pool.get_buffer(20); + const auto small_buffer = pool->get_buffer(2); + const auto large_buffer = pool->get_buffer(20); ASSERT_NE(small_buffer, nullptr); ASSERT_NE(large_buffer, nullptr); @@ -60,16 +60,16 @@ TEST(BufferPool, valid_construction) // // At this point we should be out of buffers // - ASSERT_EQ(pool.get_buffer(20), nullptr); - ASSERT_EQ(pool.get_buffer(2), nullptr); + ASSERT_EQ(pool->get_buffer(20), nullptr); + ASSERT_EQ(pool->get_buffer(2), nullptr); } TEST(BufferPool, buffer_to_large) { - BufferPool pool(BufferPoolConfig{1, 10, 1, 30}); + auto pool = std::make_shared(BufferPoolConfig{1, 10, 1, 30}); // // We should have no buffer of size 40 // - ASSERT_EQ(pool.get_buffer(40), nullptr); + ASSERT_EQ(pool->get_buffer(40), nullptr); }