Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required (VERSION 3.12)
#########

cmake_policy( SET CMP0048 NEW ) # version in project()
project( Dripline VERSION 2.10.8 )
project( Dripline VERSION 2.10.9 )

list( APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/scarab/cmake )

Expand Down
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ RUN cd /usr/local && \
cd / && \
rm -rf /usr/local/${pybind11_name}

FROM base AS devel

RUN apt-get update && \
apt-get clean && \
apt-get --fix-missing -y install \
nano \
gdb \
valgrind \
cmake-curses-gui

FROM base

# note that the build dir is *not* in source, this is so that the source can me mounted onto the container without covering the build target
Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ Types of changes: Added, Changed, Deprecated, Removed, Fixed, Security
## [Unreleased]


## [2.10.9] - 2025-12-02

### Fixed

- dl-agent was stuck in dry-run mode due to a change in configuration; this is fixed by changing the logic to set the agent in dry-run mode
- A long-standing memory leak in which messages were being kept by the receiver instead of being erased once processed is fixed


## [2.10.8] - 2025-11-04

### Changed
Expand Down
2 changes: 1 addition & 1 deletion library/agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ namespace dripline
// check if this is meant to be a dry run message
if( t_config.has( "dry_run_msg" ) )
{
f_agent->set_is_dry_run( t_config["dry_run_msg"]().as_bool() );
t_config.erase( "dry_run_msg" );
f_agent->set_is_dry_run( true );
}

this->create_and_send_message( t_config, t_core );
Expand Down
7 changes: 4 additions & 3 deletions library/receiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ namespace dripline
LDEBUG( dlog, "Received a message chunk <" << t_message->MessageId() );

auto t_parsed_message_id = message::parse_message_id( t_message->MessageId() );
if( incoming_messages().count( std::get<0>(t_parsed_message_id) ) == 0 )
std::string t_message_id( std::get<0>(t_parsed_message_id) );
if( incoming_messages().count( t_message_id ) == 0 )
{
// this path: first chunk for this message
LDEBUG( dlog, "This is the first chunk for this message; creating new message pack" );
// create the new message_pack object
incoming_message_pack& t_pack = incoming_messages()[std::get<0>(t_parsed_message_id)];
incoming_message_pack& t_pack = incoming_messages()[t_message_id];
// set the f_messages vector to the expected size
t_pack.f_messages.resize( std::get<2>(t_parsed_message_id) );
// put in place the first message chunk received
Expand All @@ -84,7 +85,7 @@ namespace dripline
{
// if we only expect one chunk, we can bypass creating a separate thread, etc
LDEBUG( dlog, "Single-chunk message being sent directly to processing" );
process_message_pack( t_pack, t_message->MessageId() );
process_message_pack( t_pack, t_message_id );
}
else
{
Expand Down
1 change: 1 addition & 0 deletions testing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ set( testing_SOURCES
run_dl_tests.cc
test_agent.cc
test_amqp.cc
test_concurrent_receiver.cc
test_core.cc
test_dripline_error.cc
test_endpoint.cc
Expand Down
67 changes: 67 additions & 0 deletions testing/test_concurrent_receiver.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* test_concurrent_receiver.cc
*
* Created on: Nov 19, 2025
* Author: N.S. Oblath
*/

#include "dripline_exceptions.hh"
#include "message.hh"
#include "receiver.hh"

#include "param_node.hh"

#include "catch2/catch_test_macros.hpp"

#include <chrono>
#include <future>
#include <thread>

namespace dripline
{
class concurrent_receiver_tester : public concurrent_receiver
{
public:
using concurrent_receiver::concurrent_receiver;

void submit_message( message_ptr_t )
{}
};
}

TEST_CASE( "cr_process_message", "[concurrent_receiver]" )
{
dripline::concurrent_receiver_tester t_concrecv;

dripline::request_ptr_t t_request_ptr = dripline::msg_request::create( scarab::param_ptr_t( new scarab::param() ), dripline::op_t::get, "dlcpp_service", "", "" );

// we process the message before executing the concurrent_receiver.
// this means the message will be queued.
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
t_concrecv.process_message( t_request_ptr );
REQUIRE( t_concrecv.message_queue().size() == 10 );

// here we launch the execution asynchronously.
// we'll give it 1 second to execute, which should be enough, though in principle it's not a 100% guarantee that it'll be done in time.
// we then cancel the service and move on to verify that the queue is empty.
auto t_do_execute = [&](){ t_concrecv.concurrent_receiver::execute(); };
auto t_exe_future = std::async(std::launch::async, t_do_execute);
std::this_thread::sleep_for( std::chrono::seconds(1) );
t_concrecv.cancel();
t_exe_future.wait();

REQUIRE( t_concrecv.message_queue().empty() );

}