From 6e4328122522dfb6784d8a2cad2c39cbfc2f6b9d Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Sat, 25 Oct 2025 20:22:02 -0400 Subject: [PATCH 1/7] Add a separate dispatch thread to ensure all UDP data is received --- source/LibMultiSense/details/legacy/udp.cc | 77 ++++++++++++++++--- .../include/details/legacy/udp.hh | 38 ++++++++- 2 files changed, 101 insertions(+), 14 deletions(-) diff --git a/source/LibMultiSense/details/legacy/udp.cc b/source/LibMultiSense/details/legacy/udp.cc index fdb4c175..91c365a2 100644 --- a/source/LibMultiSense/details/legacy/udp.cc +++ b/source/LibMultiSense/details/legacy/udp.cc @@ -46,25 +46,33 @@ namespace legacy{ UdpReceiver::UdpReceiver(const NetworkSocket &socket, size_t max_mtu, - std::function&)> receive_callback): + std::function&)> receive_callback, + size_t max_packet_queue_depth): m_socket(socket.sensor_socket), m_stop(false), + m_max_packet_queue_depth(max_packet_queue_depth), m_max_mtu(max_mtu), m_incoming_buffer(max_mtu, 0), m_receive_callback(receive_callback) { m_rx_thread = std::thread(&UdpReceiver::rx_thread, this); + m_dispatch_thread = std::thread(&UdpReceiver::dispatch_thread, this); } UdpReceiver::~UdpReceiver() { - if(!m_stop.exchange(true)) + m_stop.store(true); + m_queue_cv.notify_all(); + + if (m_rx_thread.joinable()) { - if (m_rx_thread.joinable()) - { - m_rx_thread.join(); - } + m_rx_thread.join(); + } + + if (m_dispatch_thread.joinable()) + { + m_dispatch_thread.join(); } } @@ -115,25 +123,70 @@ void UdpReceiver::rx_thread() break; } - m_incoming_buffer.resize(bytes_read); - m_receive_callback(m_incoming_buffer); - m_incoming_buffer.resize(m_max_mtu); + std::vector packet(static_cast(bytes_read)); + memcpy(packet.data(), m_incoming_buffer.data(), static_cast(bytes_read)); + + { + std::lock_guard lock(m_queue_mutex); + if (m_packet_queue.size() >= m_max_packet_queue_depth) + { + m_packet_queue.pop_front(); + CRL_DEBUG("UDP receiver queue overrun, dropping oldest packet\n"); + } + + m_packet_queue.emplace_back(std::move(packet)); + } + + m_queue_cv.notify_one(); } catch (const std::exception& e) { - CRL_DEBUG("exception while decoding packet: %s\n", e.what()); - } catch ( ... ) { - CRL_DEBUG_RAW("unknown exception while decoding packet\n"); } } } } +void UdpReceiver::dispatch_thread() +{ + while(true) + { + std::vector packet; + { + std::unique_lock lock(m_queue_mutex); + m_queue_cv.wait(lock, [this]() + { + return m_stop || !m_packet_queue.empty(); + }); + + if (m_stop && m_packet_queue.empty()) + { + break; + } + + packet = std::move(m_packet_queue.front()); + m_packet_queue.pop_front(); + } + + try + { + m_receive_callback(packet); + } + catch (const std::exception& e) + { + CRL_DEBUG("exception while decoding packet: %s\n", e.what()); + } + catch ( ... ) + { + CRL_DEBUG_RAW("unknown exception while decoding packet\n"); + } + } +} + int64_t publish_data(const NetworkSocket &socket, const std::vector &data) { // disable MSVC warning for narrowing conversion. diff --git a/source/LibMultiSense/include/details/legacy/udp.hh b/source/LibMultiSense/include/details/legacy/udp.hh index 8b9f410c..5b05a179 100644 --- a/source/LibMultiSense/include/details/legacy/udp.hh +++ b/source/LibMultiSense/include/details/legacy/udp.hh @@ -37,6 +37,9 @@ #pragma once #include +#include +#include +#include #include #include @@ -72,7 +75,8 @@ public: /// UdpReceiver(const NetworkSocket &socket, size_t max_mtu, - std::function&)> receive_callback); + std::function&)> receive_callback, + size_t max_packet_queue_depth=64); ~UdpReceiver(); @@ -83,21 +87,51 @@ private: /// void rx_thread(); + /// + /// @brief The dispatch thread function which is sends UDP data to clients + /// + void dispatch_thread(); + /// /// @brief The internal socket which UDP data is receive on /// socket_t m_socket; /// - /// @brief The rx_thread object which is spawned on construction + /// @brief The rx_thread object which is spawned on construction to receive UDP data /// std::thread m_rx_thread; + /// + /// @brief The dispatch_thread object which is spawned on construction to dispatch data to clients + /// + std::thread m_dispatch_thread; + /// /// @brief Atomic flag to stop the rx_thread on destruction /// std::atomic_bool m_stop{false}; + /// + /// @brief condition variable to notify dispatch there is data on the queue + /// + std::condition_variable m_queue_cv; + + /// + /// @brief condition variable to notify dispatch there is data on the queue + /// + std::mutex m_queue_mutex; + + /// + /// @brief queue used to send data between the rx and dispatch threads + /// + std::deque> m_packet_queue; + + /// + /// @brief the max size of the m_packet_queue + /// + size_t m_max_packet_queue_depth = 64; + /// /// @brief The amount of data to read off the socket during each read operation /// From 1be96b91c278a2682268411329fd49d61e1a80e9 Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Mon, 27 Oct 2025 15:41:13 -0400 Subject: [PATCH 2/7] Improve buffer managment for handling incoming data from LibMultiSense --- .../LibMultiSense/details/legacy/storage.cc | 136 ++++++++++-------- source/LibMultiSense/details/legacy/udp.cc | 63 +++++--- .../include/details/legacy/storage.hh | 38 ++++- .../include/details/legacy/udp.hh | 31 ++-- 4 files changed, 175 insertions(+), 93 deletions(-) diff --git a/source/LibMultiSense/details/legacy/storage.cc b/source/LibMultiSense/details/legacy/storage.cc index 0b06ba80..ba005b6d 100644 --- a/source/LibMultiSense/details/legacy/storage.cc +++ b/source/LibMultiSense/details/legacy/storage.cc @@ -51,83 +51,105 @@ BufferPool::BufferPool(const BufferPoolConfig &config): m_config(config) { - for (size_t i = 0 ; i < config.num_small_buffers ; ++i) - { - for (size_t t = 0; t < NUM_ALLOCATION_RETRIES ; ++t) + const auto allocate_buffers = + [](std::vector>& storage, + std::vector& free_list, + size_t count, + size_t buffer_size, + const char* debug_label) { - try - { - auto small_buffer = std::make_shared>(); - small_buffer->reserve(config.small_buffer_size); - m_small_buffers.emplace_back(std::move(small_buffer)); - break; - } - catch(const std::exception &e) + storage.reserve(count); + for (size_t i = 0 ; i < count ; ++i) { - (void) e; - CRL_DEBUG("Failed to allocate small buffer. Retrying\n"); - } - } - } + bool allocated = false; + for (size_t attempt = 0; attempt < NUM_ALLOCATION_RETRIES; ++attempt) + { + try + { + storage.emplace_back(buffer_size, 0); + allocated = true; + break; + } + catch(const std::exception &e) + { + CRL_DEBUG("Failed to allocate %s buffer: %s. Retrying\n", debug_label, e.what()); + } + } - for (size_t i = 0 ; i < config.num_large_buffers ; ++i) - { - for (size_t t = 0; t < NUM_ALLOCATION_RETRIES ; ++t) - { - try - { - auto large_buffer = std::make_shared>(); - large_buffer->reserve(config.large_buffer_size); - m_large_buffers.emplace_back(std::move(large_buffer)); - break; + if (!allocated) + { + CRL_EXCEPTION("Failed to allocate %s buffers", debug_label); + } } - catch(const std::exception &e) + + free_list.reserve(count); + for (size_t i = 0 ; i < count ; ++i) { - (void) e; - CRL_DEBUG("Failed to allocate large buffer\n"); + free_list.emplace_back(i); } - } - } + }; - if (m_small_buffers.size() != config.num_small_buffers || m_large_buffers.size() != config.num_large_buffers) - { - CRL_EXCEPTION("Failed to allocate buffers"); - } + allocate_buffers(m_small_buffers, m_small_free_list, config.num_small_buffers, config.small_buffer_size, "small"); + allocate_buffers(m_large_buffers, m_large_free_list, config.num_large_buffers, config.large_buffer_size, "large"); } std::shared_ptr> BufferPool::get_buffer(size_t target_size) { if (target_size <= m_config.small_buffer_size) { - const auto &small_buffer = std::find_if(std::begin(m_small_buffers), std::end(m_small_buffers), - [](const auto &small_buffer) - { - return small_buffer.use_count() == 1; - }); - - if (small_buffer != std::end(m_small_buffers)) - { - (*small_buffer)->resize(target_size, 0); - return *small_buffer; - } + return acquire_buffer(BufferType::Small, target_size); } else if (target_size <= m_config.large_buffer_size) { - const auto &large_buffer = std::find_if(std::begin(m_large_buffers), std::end(m_large_buffers), - [](const auto &large_buffer) - { - return large_buffer.use_count() == 1; - }); - - if (large_buffer != std::end(m_large_buffers)) - { - (*large_buffer)->resize(target_size, 0); - return *large_buffer; - } + return acquire_buffer(BufferType::Large, target_size); } return nullptr; } +std::shared_ptr> BufferPool::acquire_buffer(BufferType type, size_t target_size) +{ + std::unique_lock lock(m_mutex); + + auto& free_list = (type == BufferType::Small) ? m_small_free_list : m_large_free_list; + auto& storage = (type == BufferType::Small) ? m_small_buffers : m_large_buffers; + + if (free_list.empty()) + { + return nullptr; + } + + const size_t index = free_list.back(); + free_list.pop_back(); + auto* buffer = &storage[index]; + lock.unlock(); + + buffer->resize(target_size, 0); + + const auto self = shared_from_this(); + + return std::shared_ptr>(buffer, + [self, this, type, index](std::vector* released_buffer) + { + this->release_buffer(type, index, released_buffer); + }); +} + +void BufferPool::release_buffer(BufferType type, size_t index, std::vector* buffer) +{ + const size_t reserve_size = (type == BufferType::Small) ? + m_config.small_buffer_size : + m_config.large_buffer_size; + + buffer->clear(); + buffer->resize(0); + buffer->reserve(reserve_size); + + std::lock_guard lock(m_mutex); + + auto& free_list = (type == BufferType::Small) ? m_small_free_list : m_large_free_list; + free_list.emplace_back(index); +} + } } diff --git a/source/LibMultiSense/details/legacy/udp.cc b/source/LibMultiSense/details/legacy/udp.cc index 91c365a2..7e71ee38 100644 --- a/source/LibMultiSense/details/legacy/udp.cc +++ b/source/LibMultiSense/details/legacy/udp.cc @@ -34,6 +34,7 @@ * 2024-12-24, malvarado@carnegierobotics.com, IRAD, Created file. **/ +#include #include #include @@ -52,10 +53,14 @@ UdpReceiver::UdpReceiver(const NetworkSocket &socket, m_stop(false), m_max_packet_queue_depth(max_packet_queue_depth), m_max_mtu(max_mtu), - m_incoming_buffer(max_mtu, 0), + m_packet_buffers(max_packet_queue_depth, std::vector(max_mtu, 0)), m_receive_callback(receive_callback) - { + for (size_t i = 0; i < m_packet_buffers.size(); ++i) + { + m_free_buffers.emplace_back(i); + } + m_rx_thread = std::thread(&UdpReceiver::rx_thread, this); m_dispatch_thread = std::thread(&UdpReceiver::dispatch_thread, this); } @@ -63,7 +68,7 @@ UdpReceiver::UdpReceiver(const NetworkSocket &socket, UdpReceiver::~UdpReceiver() { m_stop.store(true); - m_queue_cv.notify_all(); + m_queue_valid.notify_all(); if (m_rx_thread.joinable()) { @@ -101,6 +106,18 @@ void UdpReceiver::rx_thread() // while(true) { + size_t buffer_index = 0; + { + std::lock_guard lock(m_queue_mutex); + if (m_free_buffers.empty()) + { + ++m_dropped_packets; + continue; + } + + buffer_index = m_free_buffers.front(); + m_free_buffers.pop_front(); + } try { @@ -108,9 +125,11 @@ void UdpReceiver::rx_thread() #pragma warning (push) #pragma warning (disable : 4267) #endif + auto& buffer = m_packet_buffers[buffer_index]; + buffer.resize(m_max_mtu); const int bytes_read = recvfrom(m_socket, - reinterpret_cast(m_incoming_buffer.data()), - m_incoming_buffer.size(), + reinterpret_cast(buffer.data()), + buffer.size(), 0, NULL, NULL); #if defined(WIN32) && !defined(__MINGW64__) #pragma warning (pop) @@ -120,24 +139,19 @@ void UdpReceiver::rx_thread() // if (bytes_read < 0) { + std::lock_guard lock(m_queue_mutex); + m_free_buffers.emplace_back(buffer_index); break; } - std::vector packet(static_cast(bytes_read)); - memcpy(packet.data(), m_incoming_buffer.data(), static_cast(bytes_read)); + buffer.resize(static_cast(bytes_read)); { std::lock_guard lock(m_queue_mutex); - if (m_packet_queue.size() >= m_max_packet_queue_depth) - { - m_packet_queue.pop_front(); - CRL_DEBUG("UDP receiver queue overrun, dropping oldest packet\n"); - } - - m_packet_queue.emplace_back(std::move(packet)); + m_ready_buffers.emplace_back(buffer_index); } - m_queue_cv.notify_one(); + m_queue_valid.notify_one(); } catch (const std::exception& e) { @@ -155,25 +169,26 @@ void UdpReceiver::dispatch_thread() { while(true) { - std::vector packet; + size_t buffer_index = 0; { std::unique_lock lock(m_queue_mutex); - m_queue_cv.wait(lock, [this]() + m_queue_valid.wait(lock, [this]() { - return m_stop || !m_packet_queue.empty(); + return m_stop || !m_ready_buffers.empty(); }); - if (m_stop && m_packet_queue.empty()) + if (m_stop && m_ready_buffers.empty()) { break; } - packet = std::move(m_packet_queue.front()); - m_packet_queue.pop_front(); + buffer_index = m_ready_buffers.front(); + m_ready_buffers.pop_front(); } try { + auto& packet = m_packet_buffers[buffer_index]; m_receive_callback(packet); } catch (const std::exception& e) @@ -184,6 +199,12 @@ void UdpReceiver::dispatch_thread() { CRL_DEBUG_RAW("unknown exception while decoding packet\n"); } + + { + std::lock_guard lock(m_queue_mutex); + m_packet_buffers[buffer_index].resize(m_max_mtu); + m_free_buffers.emplace_back(buffer_index); + } } } diff --git a/source/LibMultiSense/include/details/legacy/storage.hh b/source/LibMultiSense/include/details/legacy/storage.hh index b5b9f615..2c5f2a17 100644 --- a/source/LibMultiSense/include/details/legacy/storage.hh +++ b/source/LibMultiSense/include/details/legacy/storage.hh @@ -55,7 +55,7 @@ struct BufferPoolConfig /// @brief Object to handle the management and delivery of buffers to used to store incoming data without /// needing to continually reallocate internal memory. This class is threadsafe /// -class BufferPool +class BufferPool : public std::enable_shared_from_this { public: @@ -90,20 +90,48 @@ public: private: + enum class BufferType + { + Small, + Large + }; + + /// + /// @brief Acquire a buffer from the pool + /// + std::shared_ptr> acquire_buffer(BufferType type, size_t target_size); + + /// + /// @brief Release a buffer back to the bool + /// + void release_buffer(BufferType type, size_t index, std::vector* buffer); + /// /// @brief The configured numbers and sizes of our internal buffers /// BufferPoolConfig m_config; + mutable std::mutex m_mutex; + + /// + /// @brief The collection of small buffers which are in use + /// + std::vector> m_small_buffers; + + /// + /// @brief The collection of small buffers which are free to use + /// + std::vector m_small_free_list; + /// - /// @brief The collection of small buffers + /// @brief The collection of large buffers which are in use /// - std::vector>> m_small_buffers; + std::vector> m_large_buffers; /// - /// @brief The collection of large buffers + /// @brief The collection of large buffers which are free to use /// - std::vector>> m_large_buffers; + std::vector m_large_free_list; }; diff --git a/source/LibMultiSense/include/details/legacy/udp.hh b/source/LibMultiSense/include/details/legacy/udp.hh index 5b05a179..39ad1a3c 100644 --- a/source/LibMultiSense/include/details/legacy/udp.hh +++ b/source/LibMultiSense/include/details/legacy/udp.hh @@ -113,20 +113,15 @@ private: std::atomic_bool m_stop{false}; /// - /// @brief condition variable to notify dispatch there is data on the queue + /// @brief Signal for coordinating when the processing queue is valid /// - std::condition_variable m_queue_cv; + std::condition_variable m_queue_valid; /// - /// @brief condition variable to notify dispatch there is data on the queue + /// @brief Mutext for coordinating when data is ready to process /// std::mutex m_queue_mutex; - /// - /// @brief queue used to send data between the rx and dispatch threads - /// - std::deque> m_packet_queue; - /// /// @brief the max size of the m_packet_queue /// @@ -138,14 +133,30 @@ private: size_t m_max_mtu = 0; /// - /// @brief Internal buffer used to write incoming UDP data into + /// @brief queue used to send data between the rx and dispatch threads. The queue stores indices + /// into m_packet_buffers to avoid copying the payload + /// + std::vector> m_packet_buffers; + + /// + /// @brief indices of buffers which are free to write to in m_packet_buffers /// - std::vector m_incoming_buffer; + std::deque m_free_buffers; + + /// + /// @brief indices of buffers which are ready to process in m_packet_buffers + /// + std::deque m_ready_buffers; /// /// @brief User specified callback which is called once UDP data is received /// std::function&)> m_receive_callback; + + /// + /// @brief Counter for messages dropped due to dispatch processing slowdowsn + /// + std::atomic_size_t m_dropped_packets = 0; }; From b1047586963e1bc0221e59d8cf08b85357ea917f Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Mon, 27 Oct 2025 15:56:14 -0400 Subject: [PATCH 3/7] Fix unit tests. Add indication of dropped packets --- source/LibMultiSense/details/legacy/channel.cc | 3 ++- .../include/MultiSense/MultiSenseTypes.hh | 6 ++++++ .../include/details/legacy/storage.hh | 3 +++ .../include/details/legacy/udp.hh | 5 +++++ source/LibMultiSense/test/storage_test.cc | 18 +++++++++--------- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/source/LibMultiSense/details/legacy/channel.cc b/source/LibMultiSense/details/legacy/channel.cc index dce32531..3e15e143 100644 --- a/source/LibMultiSense/details/legacy/channel.cc +++ b/source/LibMultiSense/details/legacy/channel.cc @@ -727,7 +727,8 @@ std::optional LegacyChannel::get_system_status() const auto message_stats = m_message_assembler.get_message_statistics(); MultiSenseStatus::ClientNetworkStatus client_stats{message_stats.received_messages, message_stats.dropped_messages, - message_stats.invalid_packets}; + message_stats.invalid_packets, + (m_udp_receiver ? m_udp_receiver->dropped_packets() : 0)}; return MultiSenseStatus{system_ok(status->message), (ptp_status ? std::make_optional(convert(ptp_status.value())) : std::nullopt), diff --git a/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh b/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh index ffe10cbd..07497db2 100644 --- a/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh +++ b/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh @@ -1217,6 +1217,12 @@ struct MultiSenseStatus /// @brief The total number of invalid packets received on the client side /// size_t invalid_packets = 0; + + /// + /// @brief The total number of packets we recieved over the wire, but were unable to process due to + /// buffer limtis + /// + size_t unprocessed_packets = 0; }; struct TimeStatus diff --git a/source/LibMultiSense/include/details/legacy/storage.hh b/source/LibMultiSense/include/details/legacy/storage.hh index 2c5f2a17..9a8195d0 100644 --- a/source/LibMultiSense/include/details/legacy/storage.hh +++ b/source/LibMultiSense/include/details/legacy/storage.hh @@ -111,6 +111,9 @@ private: /// BufferPoolConfig m_config; + /// + /// @brief Handle acquisition of buffers in a thread safe manner + /// mutable std::mutex m_mutex; /// diff --git a/source/LibMultiSense/include/details/legacy/udp.hh b/source/LibMultiSense/include/details/legacy/udp.hh index 39ad1a3c..99288ff1 100644 --- a/source/LibMultiSense/include/details/legacy/udp.hh +++ b/source/LibMultiSense/include/details/legacy/udp.hh @@ -80,6 +80,11 @@ public: ~UdpReceiver(); + size_t dropped_packets() const + { + return m_dropped_packets; + } + private: /// diff --git a/source/LibMultiSense/test/storage_test.cc b/source/LibMultiSense/test/storage_test.cc index d5a05b9f..84a79cab 100644 --- a/source/LibMultiSense/test/storage_test.cc +++ b/source/LibMultiSense/test/storage_test.cc @@ -42,17 +42,17 @@ using namespace multisense::legacy; TEST(BufferPool, null_construction) { - BufferPool pool(BufferPoolConfig{0, 0, 0, 0}); + auto pool = std::make_shared(BufferPoolConfig{0, 0, 0, 0}); - ASSERT_EQ(pool.get_buffer(1), nullptr); + ASSERT_EQ(pool->get_buffer(1), nullptr); } TEST(BufferPool, valid_construction) { - BufferPool pool(BufferPoolConfig{1, 10, 1, 30}); + auto pool = std::make_shared(BufferPoolConfig{1, 10, 1, 30}); - const auto small_buffer = pool.get_buffer(2); - const auto large_buffer = pool.get_buffer(20); + const auto small_buffer = pool->get_buffer(2); + const auto large_buffer = pool->get_buffer(20); ASSERT_NE(small_buffer, nullptr); ASSERT_NE(large_buffer, nullptr); @@ -60,16 +60,16 @@ TEST(BufferPool, valid_construction) // // At this point we should be out of buffers // - ASSERT_EQ(pool.get_buffer(20), nullptr); - ASSERT_EQ(pool.get_buffer(2), nullptr); + ASSERT_EQ(pool->get_buffer(20), nullptr); + ASSERT_EQ(pool->get_buffer(2), nullptr); } TEST(BufferPool, buffer_to_large) { - BufferPool pool(BufferPoolConfig{1, 10, 1, 30}); + auto pool = std::make_shared(BufferPoolConfig{1, 10, 1, 30}); // // We should have no buffer of size 40 // - ASSERT_EQ(pool.get_buffer(40), nullptr); + ASSERT_EQ(pool->get_buffer(40), nullptr); } From 1d051cc061b058f67a976ed6b7b950a595c63e92 Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Mon, 27 Oct 2025 16:00:30 -0400 Subject: [PATCH 4/7] Fix spelling --- source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh b/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh index 07497db2..ad099303 100644 --- a/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh +++ b/source/LibMultiSense/include/MultiSense/MultiSenseTypes.hh @@ -1219,8 +1219,8 @@ struct MultiSenseStatus size_t invalid_packets = 0; /// - /// @brief The total number of packets we recieved over the wire, but were unable to process due to - /// buffer limtis + /// @brief The total number of packets we received over the wire, but were unable to process due to + /// buffer limits /// size_t unprocessed_packets = 0; }; From 8bbce955644fe0ade4f78240e0d5777d6714f468 Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Mon, 3 Nov 2025 23:10:10 -0500 Subject: [PATCH 5/7] Small function refactor --- .../LibMultiSense/details/legacy/storage.cc | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/source/LibMultiSense/details/legacy/storage.cc b/source/LibMultiSense/details/legacy/storage.cc index ba005b6d..73f11902 100644 --- a/source/LibMultiSense/details/legacy/storage.cc +++ b/source/LibMultiSense/details/legacy/storage.cc @@ -44,53 +44,57 @@ namespace multisense{ namespace legacy{ namespace { - constexpr size_t NUM_ALLOCATION_RETRIES = 5; -} -BufferPool::BufferPool(const BufferPoolConfig &config): - m_config(config) +constexpr size_t NUM_ALLOCATION_RETRIES = 5; + +std::pair< + std::vector>, + std::vector> +allocate_buffers( size_t count, size_t buffer_size) { + std::vector> storage{}; + std::vector free_list{}; - const auto allocate_buffers = - [](std::vector>& storage, - std::vector& free_list, - size_t count, - size_t buffer_size, - const char* debug_label) + storage.reserve(count); + for (size_t i = 0 ; i < count ; ++i) + { + bool allocated = false; + for (size_t attempt = 0; attempt < NUM_ALLOCATION_RETRIES; ++attempt) { - storage.reserve(count); - for (size_t i = 0 ; i < count ; ++i) + try { - bool allocated = false; - for (size_t attempt = 0; attempt < NUM_ALLOCATION_RETRIES; ++attempt) - { - try - { - storage.emplace_back(buffer_size, 0); - allocated = true; - break; - } - catch(const std::exception &e) - { - CRL_DEBUG("Failed to allocate %s buffer: %s. Retrying\n", debug_label, e.what()); - } - } - - if (!allocated) - { - CRL_EXCEPTION("Failed to allocate %s buffers", debug_label); - } + storage.emplace_back(buffer_size, 0); + allocated = true; + break; } - - free_list.reserve(count); - for (size_t i = 0 ; i < count ; ++i) + catch(const std::exception &e) { - free_list.emplace_back(i); + CRL_DEBUG("Failed to allocate buffers: %s. Retrying\n", e.what()); } - }; + } - allocate_buffers(m_small_buffers, m_small_free_list, config.num_small_buffers, config.small_buffer_size, "small"); - allocate_buffers(m_large_buffers, m_large_free_list, config.num_large_buffers, config.large_buffer_size, "large"); + if (!allocated) + { + CRL_EXCEPTION("Failed to allocate buffers"); + } + } + + free_list.reserve(count); + for (size_t i = 0 ; i < count ; ++i) + { + free_list.emplace_back(i); + } + + return std::make_pair(std::move(storage), std::move(free_list)); +} + +} + +BufferPool::BufferPool(const BufferPoolConfig &config): + m_config(config) +{ + std::tie(m_small_buffers, m_small_free_list) = allocate_buffers(config.num_small_buffers, config.small_buffer_size); + std::tie(m_large_buffers, m_large_free_list) = allocate_buffers(config.num_large_buffers, config.large_buffer_size); } std::shared_ptr> BufferPool::get_buffer(size_t target_size) From aabf29354c54c72eacf8e7aca3cc0beb073bc4a4 Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Fri, 7 Nov 2025 15:23:18 -0500 Subject: [PATCH 6/7] Delete move constructors --- source/LibMultiSense/include/details/legacy/storage.hh | 6 ------ 1 file changed, 6 deletions(-) diff --git a/source/LibMultiSense/include/details/legacy/storage.hh b/source/LibMultiSense/include/details/legacy/storage.hh index 9a8195d0..deed5f78 100644 --- a/source/LibMultiSense/include/details/legacy/storage.hh +++ b/source/LibMultiSense/include/details/legacy/storage.hh @@ -69,12 +69,6 @@ public: BufferPool(const BufferPool&) = delete; BufferPool& operator=(const BufferPool&) = delete; - /// - /// Movable - /// - BufferPool(BufferPool&&) noexcept = default; - BufferPool& operator=(BufferPool&&) noexcept = default; - /// /// @brief Get a buffer which will contain at least target_size bytes of storage /// From 2e0856d4711656b38c97c13153b3b1f2ff909b5f Mon Sep 17 00:00:00 2001 From: Matt Alvarado Date: Fri, 7 Nov 2025 15:28:12 -0500 Subject: [PATCH 7/7] Remove unused variable --- source/LibMultiSense/details/legacy/udp.cc | 1 - source/LibMultiSense/include/details/legacy/udp.hh | 5 ----- 2 files changed, 6 deletions(-) diff --git a/source/LibMultiSense/details/legacy/udp.cc b/source/LibMultiSense/details/legacy/udp.cc index a5d033b2..774f5a93 100644 --- a/source/LibMultiSense/details/legacy/udp.cc +++ b/source/LibMultiSense/details/legacy/udp.cc @@ -51,7 +51,6 @@ UdpReceiver::UdpReceiver(const NetworkSocket &socket, size_t max_packet_queue_depth): m_socket(socket.sensor_socket), m_stop(false), - m_max_packet_queue_depth(max_packet_queue_depth), m_max_mtu(max_mtu), m_packet_buffers(max_packet_queue_depth, std::vector(max_mtu, 0)), m_receive_callback(receive_callback) diff --git a/source/LibMultiSense/include/details/legacy/udp.hh b/source/LibMultiSense/include/details/legacy/udp.hh index 99288ff1..3f329d2f 100644 --- a/source/LibMultiSense/include/details/legacy/udp.hh +++ b/source/LibMultiSense/include/details/legacy/udp.hh @@ -127,11 +127,6 @@ private: /// std::mutex m_queue_mutex; - /// - /// @brief the max size of the m_packet_queue - /// - size_t m_max_packet_queue_depth = 64; - /// /// @brief The amount of data to read off the socket during each read operation ///