From ea6466bce55e9b9c81eb06fea01ebec3aee785db Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Wed, 1 Oct 2025 15:42:34 +0200 Subject: [PATCH 01/10] added missing documentation; fixed examples; additional CMakeLists checks --- CMakeLists.txt | 12 +- README.md | 45 ++++--- cmake/Dependencies.cmake | 1 - examples/CMakeLists.txt | 12 +- examples/README.md | 22 ++++ examples/main.cpp | 41 ------ examples/main_consumer.cpp | 80 ++++++++++++ examples/main_driver.cpp | 11 ++ examples/main_producer.cpp | 56 +++++++++ .../AsyncFunctionExecutor.hpp | 78 +++++++++--- .../async_function_execution/Constants.hpp | 4 + .../TimeoutIdleStrategy.hpp | 118 +++++++----------- .../clients/AeronClient.hpp | 52 ++++---- .../clients/ClientInterface.hpp | 4 +- .../structures/Settings.hpp | 38 +++--- .../TimeoutIdleStrategy.cpp | 65 ++++++++++ test/CMakeLists.txt | 4 +- test/README.md | 4 +- test/include/AsyncFunctionExecutorTests.hpp | 28 ++--- test/include/MockClient.hpp | 6 +- .../main_segfault_consumer.cpp | 6 +- .../main_segfault_producer.cpp | 6 +- .../main_simple_function.cpp | 10 +- test/source/AsyncFunctionExecutorTests.cpp | 12 +- 24 files changed, 469 insertions(+), 246 deletions(-) create mode 100644 examples/README.md delete mode 100644 examples/main.cpp create mode 100644 examples/main_consumer.cpp create mode 100644 examples/main_driver.cpp create mode 100644 examples/main_producer.cpp create mode 100644 source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index fd8ae41..5092016 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ PROJECT(async-function-execution C CXX) SET(CMAKE_POSITION_INDEPENDENT_CODE ON) SET(CMAKE_CXX_STANDARD 20) -SET(ASYNC_FUNCTION_EXECUTION_VERSION 0.1.0) +SET(ASYNC_FUNCTION_EXECUTION_VERSION 1.0.0) FIND_PACKAGE(CMLIB COMPONENTS CMDEF CMUTIL @@ -14,7 +14,6 @@ FIND_PACKAGE(CMLIB INCLUDE(GNUInstallDirs) SET(ASYNC_FUNCTION_EXECUTION_TARGET_NAME async-function-execution) -SET(ASYNC_FUNCTION_EXECUTION_ALIAS_NAME ${PROJECT_NAME}::async-function-execution) OPTION(BRINGAUTO_SAMPLES OFF) OPTION(BRINGAUTO_PACKAGE "Package creation" OFF) @@ -38,7 +37,6 @@ IF (BRINGAUTO_PACKAGE) ENDIF () FIND_PACKAGE(aeron 1.48.6 REQUIRED) -FIND_PACKAGE(nlohmann_json 3.2.0 REQUIRED) FILE(GLOB_RECURSE source_files ${CMAKE_CURRENT_LIST_DIR}/source/*) @@ -51,19 +49,15 @@ CMDEF_ADD_LIBRARY( VERSION ${ASYNC_FUNCTION_EXECUTION_VERSION} ) -# I guess this is not needed when we use package? -#ADD_LIBRARY(${ASYNC_FUNCTION_EXECUTION_ALIAS_NAME} ALIAS "${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared") - TARGET_LINK_LIBRARIES(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PUBLIC aeron::aeron aeron::aeron_client aeron::aeron_driver - nlohmann_json::nlohmann_json ) IF(BRINGAUTO_TESTS) ENABLE_TESTING() - INCLUDE(${CMAKE_CURRENT_SOURCE_DIR}/test/CMakeLists.txt) + ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/test) INCLUDE(CTest) ENDIF(BRINGAUTO_TESTS) @@ -85,5 +79,5 @@ IF (BRINGAUTO_PACKAGE) ENDIF () IF (BRINGAUTO_SAMPLES) - ADD_SUBDIRECTORY(${CMAKE_CURRENT_LIST_DIR}/examples/) + ADD_SUBDIRECTORY(${CMAKE_CURRENT_LIST_DIR}/examples) ENDIF () diff --git a/README.md b/README.md index 8e48b8f..b467fd3 100644 --- a/README.md +++ b/README.md @@ -21,13 +21,37 @@ AsyncFunctionExecutor executorProducer { Config { .isProducer = true, // decides the mode of the executor .defaultTimeout = std::chrono::seconds(1) // polling timeout (should only be used when producer) + .functionConfigurations = structures::FuntionConfigs { { + { 1, { std::chrono::seconds(2) }} + } } }, - FunctionList { std::tuple{ // list of all functions + FunctionList { // list of all functions FunctionAdd - } } + } }; ``` +#### Post initialization + +Before using any functions, connection needs to be established using the connect function: + +```cpp +executorProducer.connect(); +``` + +#### functionConfigurations + +The functionConfigurations parameter accepts an unordered map representing per function configurations. Syntax: + +```cpp +{ + { , { } } +} +``` + +Supported parameters: + - timeout: replaces the default timeout value for that function (in nanoseconds) + ### Producer Producer is the side calling functions and waiting for a response from the consumer. If timeout is provided in config, the function will throw if it doesn't execute in time. Example of function calling: @@ -79,27 +103,16 @@ If a producer expects a return value where returned bytes are used directly, the ## Requirements -- [cmlib](https://github.com/cmakelib/cmakelib) - [aeron](https://github.com/aeron-io/aeron) +- [cmlib](https://github.com/cmakelib/cmakelib) + - the CMLIB_DIR env value has to be set -### Aeron setup - -Build and install aeron to any folder. - -```bash -git clone https://github.com/aeron-io/aeron.git -cd aeron -git checkout 1.48.5 -./cppbuild/cppbuild -cd cppbuild/Release -cmake --install . --prefix -``` ## Build ```bash mkdir -p _build && cd _build -cmake ../ -DCMLIB_DIR= -DCMAKE_PREFIX_PATH= +cmake ../ make ``` diff --git a/cmake/Dependencies.cmake b/cmake/Dependencies.cmake index 6147af7..c0e8103 100644 --- a/cmake/Dependencies.cmake +++ b/cmake/Dependencies.cmake @@ -1,6 +1,5 @@ SET(CMAKE_FIND_USE_CMAKE_SYSTEM_PATH FALSE) -BA_PACKAGE_LIBRARY(nlohmann-json v3.10.5 NO_DEBUG ON) BA_PACKAGE_LIBRARY(aeron v1.48.6) IF (BRINGAUTO_TESTS) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d679e08..93538c6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,5 +1,13 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25 FATAL_ERROR) -ADD_EXECUTABLE(example main.cpp) +IF(NOT TARGET async-function-execution-shared) + MESSAGE(FATAL_ERROR "The async-function-execution-shared target was not found. Please build the example as part of the async-function-execution-shared project.") +ENDIF() -TARGET_LINK_LIBRARIES(example PUBLIC async-function-execution-shared) \ No newline at end of file +ADD_EXECUTABLE(example_producer main_producer.cpp) +ADD_EXECUTABLE(example_consumer main_consumer.cpp) +ADD_EXECUTABLE(example_driver main_driver.cpp) + +TARGET_LINK_LIBRARIES(example_producer PUBLIC async-function-execution-shared) +TARGET_LINK_LIBRARIES(example_consumer PUBLIC async-function-execution-shared) +TARGET_LINK_LIBRARIES(example_driver PUBLIC async-function-execution-shared) diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..c86b67a --- /dev/null +++ b/examples/README.md @@ -0,0 +1,22 @@ +# Async function execution example + +This folder contains a simple example of the usage of this project. 3 executables will be built: the aeron driver, the producer and the consumer. + +## Build + +Examples are built as part of the main project. Don't use the CMakeList in the test folder. + +```bash +mkdir -p _build_example && cd _build_example +cmake ../ -DBRINGAUTO_SAMPLES=ON +make +``` + +## Run + +```bash +# Run the executables in this order +./example_driver +./example_consumer +./example_producer +``` \ No newline at end of file diff --git a/examples/main.cpp b/examples/main.cpp deleted file mode 100644 index 0e409bb..0000000 --- a/examples/main.cpp +++ /dev/null @@ -1,41 +0,0 @@ -#include - -#include -#include - - -using namespace bringauto::async_function_execution; - -FunctionDefinition ExampleFunc1 { - FunctionId { 1 }, - Return { std::string {} }, - Arguments { int {}, std::string {}, float {} } -}; - -FunctionDefinition ExampleFunc2 { - FunctionId { 2 }, - Return { std::string {} }, - Arguments { int {}, std::string {} } -}; - -FunctionDefinition ExampleFunc3 { - FunctionId { 3 }, - Return { std::string {} }, - Arguments { int {} } -}; - - -int main() { - AsyncFunctionExecutor executor { - Config { - .isProducer = true, - .defaultTimeout = std::chrono::seconds(1) - }, - FunctionList { std::tuple{ ExampleFunc1, ExampleFunc2, ExampleFunc3 } }, - }; - - std::cout << executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f) << std::endl; - std::cout << executor.callFunc(ExampleFunc2, 100, "World") << std::endl; - std::cout << executor.callFunc(ExampleFunc3, 123) << std::endl; - return 0; -} diff --git a/examples/main_consumer.cpp b/examples/main_consumer.cpp new file mode 100644 index 0000000..4576d77 --- /dev/null +++ b/examples/main_consumer.cpp @@ -0,0 +1,80 @@ +#include +#include + + + +using namespace bringauto::async_function_execution; + +struct SerializableString final { + std::string value {}; + SerializableString() = default; + SerializableString(std::string str) : value(std::move(str)) {} + + std::span serialize() const { + return std::span {reinterpret_cast(value.data()), value.size()}; + } + void deserialize(std::span bytes) { + value = std::string {reinterpret_cast(bytes.data()), bytes.size()}; + } +}; + +FunctionDefinition ExampleFunc1 { + FunctionId { 1 }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {}, float {} } +}; + +FunctionDefinition ExampleFunc2 { + FunctionId { 2 }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {} } +}; + +FunctionDefinition ExampleFunc3 { + FunctionId { 3 }, + Return { SerializableString {} }, + Arguments { int {} } +}; + + +int main() { + AsyncFunctionExecutor executor { + Config { + .isProducer = false, + }, + FunctionList { ExampleFunc1, ExampleFunc2, ExampleFunc3 }, + }; + + executor.connect(); + + while (true) { + auto [funcId, argBytes] = executor.pollFunction(); + + switch (funcId.value) { + case 1: { + auto [arg1, arg2, arg3] = executor.getFunctionArgs(ExampleFunc1, argBytes); + std::cout << "Consumer: Received Function 1 call with args (" << arg1 << ", " << arg2.value << ", " << arg3 << ")." << std::endl; + executor.sendReturnMessage(funcId, "Func 1 return value"); + break; + } + case 2: { + auto [arg1, arg2] = executor.getFunctionArgs(ExampleFunc2, argBytes); + std::cout << "Consumer: Received Function 2 call with args (" << arg1 << ", " << arg2.value << ")." << std::endl; + executor.sendReturnMessage(funcId, "Func 2 return value"); + break; + } + case 3: { + auto [arg1] = executor.getFunctionArgs(ExampleFunc3, argBytes); + SerializableString returnValue { "Result for input " + std::to_string(arg1) }; + std::cout << "Consumer: Received Function 3 call with args (" << arg1 << ")" << std::endl; + executor.sendReturnMessage(funcId, "Func 3 return value"); + break; + } + default: + std::cerr << "Consumer: Unknown function ID received: " << static_cast(funcId.value) << std::endl; + throw std::runtime_error("Unknown function ID"); + } + } + + return 0; +} diff --git a/examples/main_driver.cpp b/examples/main_driver.cpp new file mode 100644 index 0000000..b39643c --- /dev/null +++ b/examples/main_driver.cpp @@ -0,0 +1,11 @@ +#include + + + +using namespace bringauto::async_function_execution; + +int main() { + AeronDriver driver; + driver.run(); + return 0; +} diff --git a/examples/main_producer.cpp b/examples/main_producer.cpp new file mode 100644 index 0000000..c605f90 --- /dev/null +++ b/examples/main_producer.cpp @@ -0,0 +1,56 @@ +#include + + +using namespace bringauto::async_function_execution; + +struct SerializableString final { + std::string value {}; + SerializableString() = default; + SerializableString(std::string str) : value(std::move(str)) {} + + std::span serialize() const { + return std::span {reinterpret_cast(value.data()), value.size()}; + } + void deserialize(std::span bytes) { + value = std::string {reinterpret_cast(bytes.data()), bytes.size()}; + } +}; + +FunctionDefinition ExampleFunc1 { + FunctionId { 1 }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {}, float {} } +}; + +FunctionDefinition ExampleFunc2 { + FunctionId { 2 }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {} } +}; + +FunctionDefinition ExampleFunc3 { + FunctionId { 3 }, + Return { SerializableString {} }, + Arguments { int {} } +}; + + +int main() { + AsyncFunctionExecutor executor { + Config { + .isProducer = true, + .defaultTimeout = std::chrono::seconds(1) + }, + FunctionList { ExampleFunc1, ExampleFunc2, ExampleFunc3 }, + }; + + executor.connect(); + + auto result1 = executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f); + std::cout << result1.value << std::endl; + auto result2 = executor.callFunc(ExampleFunc2, 100, "World"); + std::cout << result2.value << std::endl; + auto result3 = executor.callFunc(ExampleFunc3, 123); + std::cout << result3.value << std::endl; + return 0; +} diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 5232514..1523908 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -19,13 +19,13 @@ namespace bringauto::async_function_execution { struct Config { bool isProducer = true; std::chrono::nanoseconds defaultTimeout = std::chrono::nanoseconds(0); - std::string_view functionConfigurations = ""; + structures::FunctionConfigs functionConfigurations {}; }; /** * @brief Unique identifier for a function in the AsyncFunctionExecutor. - * Supported range is 0-255 + * value: The function ID value. Supported range is 0-255. */ struct FunctionId { const uint8_t value; @@ -43,6 +43,7 @@ concept HasSerialize = requires(const T& t) { /** * @brief Structure representing the return type of function. + * value: The return type value. */ template struct Return { @@ -62,6 +63,7 @@ struct Return { /** * @brief Structure representing the argument types of a function. + * values: A tuple holding the types of the function arguments. */ template struct Arguments { @@ -72,6 +74,9 @@ struct Arguments { /** * @brief Definition of a function that can be called or responded to via AsyncFunctionExecutor. + * id: Unique identifier for the function. + * returnType: The return type of the function. + * argumentTypes: The argument types of the function. */ template struct FunctionDefinition { @@ -82,15 +87,45 @@ struct FunctionDefinition { /** - * @brief Helper structure to hold a list of function definitions. + * @brief Helper type trait to identify FunctionDefinition types. */ -template +template +struct is_function_definition : std::false_type {}; + + +/** + * @brief Specialization of is_function_definition for FunctionDefinition types. + */ +template +struct is_function_definition> : std::true_type {}; + + +/** + * @brief Concept to ensure a type is a FunctionDefinition. + */ +template +concept IsFunctionDefinition = requires { typename std::decay_t; } && + is_function_definition>::value; + + +/** + * @brief Helper structure used to store function definitions. + * functions: A tuple holding all the function definitions. + */ +template struct FunctionList { const std::tuple functions; - FunctionList(Funcs&&... funcs) : functions(std::forward(funcs)...) {} + FunctionList(Funcs... funcs) : functions(std::move(funcs)...) {} }; +/** + * @brief Deduction guide for FunctionList to simplify its construction. + */ +template +FunctionList(Funcs&&...) -> FunctionList...>; + + /** * @brief This class provides a high-level interface for async communication, allowing function calls over shared memory * with serialization and deserialization. @@ -103,6 +138,7 @@ class AsyncFunctionExecutor { * * @param config Configuration for the async function executor. * @param functions A list of function definitions that the client can call or respond to. + * @param client Optional custom client implementing ClientInterface. If not provided, a default AeronClient is used. */ AsyncFunctionExecutor(Config config, const FunctionList &functions, @@ -117,7 +153,7 @@ class AsyncFunctionExecutor { ); } - for (const auto& [funcId, _] : settings_.functionConfigs) { + for (const auto& [funcId, _] : settings_.functionConfigs.configs) { if (!isFunctionDefined(FunctionId{funcId})) { throw std::runtime_error("Warning: Function ID " + std::to_string(static_cast(funcId)) + " in configuration is not defined in FunctionList."); } @@ -130,27 +166,28 @@ class AsyncFunctionExecutor { /** * @brief Connects the client to the media driver and sets up communication channels. * Needs to be called before any function calls or polling. + * + * @return Returns 0 on success, or a negative error code on failure. */ - void connect() { + int connect() { std::vector toProducer; std::vector fromProducer; - std::apply([&](auto&&... funcDefs) { + std::apply([&](const auto&... funcDefs) { (toProducer.push_back(funcDefs.id.value + MESSAGE_RETURN_CHANNEL_OFFSET), ...); (fromProducer.push_back(funcDefs.id.value), ...); - }, std::get<0>(functions_.functions)); + }, functions_.functions); if (settings_.isProducer) { - client_->connect(toProducer, fromProducer); - } else { - client_->connect(fromProducer, toProducer); + return client_->connect(toProducer, fromProducer); } + return client_->connect(fromProducer, toProducer); } /** * @brief Calls a function defined in the FunctionList, sending arguments and waiting for a response. - * Can only be used in producer mode. + * Can only be used in producer mode. Throws on error. * * @param function The function definition of which function to call. * @param args The arguments to pass to the function. @@ -172,7 +209,11 @@ class AsyncFunctionExecutor { auto messageBytes = serializeArgs(function.id, args...); client_->sendMessage(function.id.value, messageBytes); - auto responseBytes = client_->waitForMessage(function.id.value + MESSAGE_RETURN_CHANNEL_OFFSET); + auto responseBytes = client_->waitForMessage(function.id.value + MESSAGE_RETURN_CHANNEL_OFFSET, + settings_.functionConfigs.configs.contains(function.id.value) ? + settings_.functionConfigs.configs[function.id.value].timeout : + settings_.defaultTimeout + ); if (responseBytes.empty()) { throw std::runtime_error("No response received or timeout"); } @@ -211,7 +252,7 @@ class AsyncFunctionExecutor { /** * @brief Deserializes function arguments from a byte span into a tuple of argument values. - * Can only be used in consumer mode. + * Can only be used in consumer mode. Throws on error. * * @param function The function definition corresponding to the arguments. * @param argBytes The byte span containing the serialized arguments. @@ -289,9 +330,9 @@ class AsyncFunctionExecutor { private: bool isFunctionDefined(const FunctionId &funcId) { bool found = false; - std::apply([&](auto&&... funcDefs) { + std::apply([&](const auto&... funcDefs) { ((funcDefs.id.value == funcId.value ? found = true : false), ...); - }, std::get<0>(functions_.functions)); + }, functions_.functions); return found; } @@ -299,7 +340,7 @@ class AsyncFunctionExecutor { std::span serializeArgs(const FunctionId &funcId, const Args&... args) { serializationBuffer_.clear(); std::size_t totalSize = 2; // Function ID + Argument count - ((totalSize += 2 + sizeof(args)), ...); // Each argument: size byte + data + ((totalSize += 2 + sizeof(args)), ...); // Each argument: size bytes + data serializationBuffer_.reserve(totalSize); serializationBuffer_.push_back(funcId.value); serializationBuffer_.push_back(static_cast(sizeof...(Args))); @@ -368,6 +409,7 @@ class AsyncFunctionExecutor { /// Buffer used for serialization of messages. mutable std::vector serializationBuffer_; + /// Client used for communication. Can be a custom implementation of ClientInterface. std::unique_ptr client_; structures::Settings settings_; FunctionList functions_; diff --git a/include/bringauto/async_function_execution/Constants.hpp b/include/bringauto/async_function_execution/Constants.hpp index ecdfaed..78c5ad7 100644 --- a/include/bringauto/async_function_execution/Constants.hpp +++ b/include/bringauto/async_function_execution/Constants.hpp @@ -6,10 +6,14 @@ namespace bringauto::async_function_execution { +/// Maximum number of fragments to process in a single poll operation. constexpr int POLL_FRAGMENTS_LIMIT = 10; +/// Offset added to function ID to determine the return message channel ID. constexpr int MESSAGE_RETURN_CHANNEL_OFFSET = 1000; +/// Maximum size for serialized function arguments. constexpr int MAX_ARGUMENT_SIZE = 65535; +/// Default Aeron connection string for communication over shared memory. constexpr std::string_view DEFAULT_AERON_CONNECTION = "aeron:ipc"; } diff --git a/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp b/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp index 39a1b92..63efa5f 100644 --- a/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp +++ b/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp @@ -11,94 +11,63 @@ namespace bringauto::async_function_execution { -enum class IdleState { - NOT_IDLE = 0, - SPINNING = 1, - YIELDING = 2, - PARKING = 3 -}; - class TimeoutIdleStrategy { public: + /** + * @brief Constructs a TimeoutIdleStrategy instance. + * + * @param timeoutNs Optional default timeout duration for idling operations. If set to zero, no timeout is applied. + * @param maxSpinPeriodNs Maximum duration to spend in the SPINNING state. Default is 1 second. + * @param maxYieldPeriodNs Maximum duration to spend in the YIELDING state. Default is 2 seconds. + * @param minParkPeriodNs Minimum duration to sleep in the PARKING state. Default is 1 microsecond. + * @param maxParkPeriodNs Maximum duration to sleep in the PARKING state. Default is 1 millisecond. + */ explicit TimeoutIdleStrategy( std::chrono::nanoseconds timeoutNs = std::chrono::nanoseconds(0), std::chrono::nanoseconds maxSpinPeriodNs = std::chrono::duration(1000), std::chrono::nanoseconds maxYieldPeriodNs = std::chrono::duration(2000), std::chrono::nanoseconds minParkPeriodNs = std::chrono::nanoseconds(1000), std::chrono::nanoseconds maxParkPeriodNs = std::chrono::duration(1) - ) : prePad_(), - maxSpinPeriodNs_(maxSpinPeriodNs), + ) : maxSpinPeriodNs_(maxSpinPeriodNs), maxYieldPeriodNs_(maxYieldPeriodNs), minParkPeriodNs_(minParkPeriodNs), maxParkPeriodNs_(maxParkPeriodNs), parkPeriodNs_(minParkPeriodNs), state_(IdleState::NOT_IDLE), - timeoutNs_(timeoutNs), - postPad_() {} - - int idle(const int workCount) { - if (workCount > 0) { - reset(); - return 0; - } - return idle(); - } - - void reset() { - parkPeriodNs_ = minParkPeriodNs_; - state_ = IdleState::NOT_IDLE; - startTime_ = std::chrono::steady_clock::time_point(); - } - - int idle() { - auto now = std::chrono::steady_clock::now(); - if (timeoutNs_ != std::chrono::nanoseconds(0)) { - if (startTime_ == std::chrono::steady_clock::time_point()) { - startTime_ = now; - } - if (std::chrono::duration_cast(now - startTime_) >= timeoutNs_) { - return -1; - } - } - - switch(state_) { - case IdleState::NOT_IDLE: - state_ = IdleState::SPINNING; - lastStateSwitchTime_ = now; - break; - - case IdleState::SPINNING: - aeron::concurrent::atomic::cpu_pause(); - if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxSpinPeriodNs_) { - state_ = IdleState::YIELDING; - lastStateSwitchTime_ = now; - } - break; - - case IdleState::YIELDING: - std::this_thread::yield(); - if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxYieldPeriodNs_) { - state_ = IdleState::PARKING; - parkPeriodNs_ = minParkPeriodNs_; - lastStateSwitchTime_ = now; - } - break; - - case IdleState::PARKING: - default: - std::this_thread::sleep_for(parkPeriodNs_); - parkPeriodNs_ = std::min(parkPeriodNs_ * 2, maxParkPeriodNs_); - break; - } - return 0; - } - - void setTimeout(std::chrono::nanoseconds timeoutNs) { - timeoutNs_ = timeoutNs; - } + timeoutNs_(timeoutNs) {} + + /** + * @brief Idles based on the current state and the provided timeout. + * + * @param workCount Number of work items processed since the last call. + * @param timeout Optional timeout duration. If not provided, the default timeout set in the + * configuration will be used. + * @return 0 if idling continues, -1 if the timeout has been reached. + */ + int idle(const int workCount, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)); + + /** + * @brief Resets the idle strategy to its initial state. + */ + void reset(); + +private: + /** + * @brief Enumeration of idle states. + */ + enum class IdleState { + /// Default state, no idling. + NOT_IDLE = 0, + /// Spinning state, actively checking for work. + SPINNING = 1, + /// Yielding state, giving up the CPU for a short duration. + YIELDING = 2, + /// Parking state, sleeping for a longer duration. + PARKING = 3 + }; + + int idle(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)); -protected: - std::uint8_t prePad_[aeron::util::BitUtil::CACHE_LINE_LENGTH]; std::chrono::nanoseconds maxSpinPeriodNs_; std::chrono::nanoseconds maxYieldPeriodNs_; std::chrono::nanoseconds minParkPeriodNs_; @@ -108,7 +77,6 @@ class TimeoutIdleStrategy { std::chrono::nanoseconds timeoutNs_; std::chrono::steady_clock::time_point startTime_; std::chrono::steady_clock::time_point lastStateSwitchTime_; - std::uint8_t postPad_[aeron::util::BitUtil::CACHE_LINE_LENGTH]; }; } diff --git a/include/bringauto/async_function_execution/clients/AeronClient.hpp b/include/bringauto/async_function_execution/clients/AeronClient.hpp index 9797056..0aedc49 100644 --- a/include/bringauto/async_function_execution/clients/AeronClient.hpp +++ b/include/bringauto/async_function_execution/clients/AeronClient.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -63,30 +64,36 @@ class AeronClient final : public ClientInterface { */ int connect(const std::vector& subscriptionIds, const std::vector& publicationIds) override { aeron_ = aeron::Aeron::connect(aeronContext_); - int64_t id; - for (const auto &pubId : publicationIds) { - id = aeron_->addPublication(aeronConnection_, pubId); - std::shared_ptr publication = aeron_->findPublication(id); - while (!publication) { - std::this_thread::yield(); - publication = aeron_->findPublication(id); - } - aeronPublications_[pubId] = publication; - } - for (const auto &subId : subscriptionIds) { - id = aeron_->addSubscription(aeronConnection_, subId); - std::shared_ptr subscription = aeron_->findSubscription(id); - while (!subscription) { - std::this_thread::yield(); - subscription = aeron_->findSubscription(id); + try { + for (const auto &pubId : publicationIds) { + id = aeron_->addPublication(aeronConnection_, pubId); + std::shared_ptr publication = aeron_->findPublication(id); + while (!publication) { + std::this_thread::yield(); + publication = aeron_->findPublication(id); + } + aeronPublications_[pubId] = publication; + } + + for (const auto &subId : subscriptionIds) { + id = aeron_->addSubscription(aeronConnection_, subId); + std::shared_ptr subscription = aeron_->findSubscription(id); + while (!subscription) { + std::this_thread::yield(); + subscription = aeron_->findSubscription(id); + } + aeronSubscriptions_[subId] = subscription; + aeronPolling_[subId] = false; } - aeronSubscriptions_[subId] = subscription; - aeronPolling_[subId] = false; + } catch (const std::exception &e) { + std::cerr << "Aeron connection error: " << e.what() << std::endl; + return -1; // Error: Aeron connection failed } if (aeronPublications_.empty() || aeronSubscriptions_.empty()) { + std::cerr << "Aeron connection error: No publications or subscriptions available" << std::endl; return -1; // Error: No publications or subscriptions available } return 0; @@ -99,7 +106,7 @@ class AeronClient final : public ClientInterface { * @param channelId The channel ID to send the message to. * @param messageBytes The message bytes to send. * @return Returns a positive number on success, or a negative error code on failure. - * (NOT_CONNECTED = -1, BACK_PRESSURED = -2, ADMIN_ACTION = -3, PUBLICATION_CLOSED = -4) + * (NOT_CONNECTED = -1, BACK_PRESSURED = -2, ADMIN_ACTION = -3, PUBLICATION_CLOSED = -4) */ int sendMessage(const uint32_t channelId, std::span &messageBytes) override { aeron::concurrent::AtomicBuffer srcBuffer(const_cast(messageBytes.data()), messageBytes.size()); @@ -111,13 +118,14 @@ class AeronClient final : public ClientInterface { * @brief Retrieves the last received message from Aeron. * * @param channelId The channel ID to wait for a message from. + * @param timeout Maximum time to wait for a message before timing out. * @return Bytes of the last message received. Returns an empty span on timeout or error. */ - std::span waitForMessage(const uint32_t channelId) override { + std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)) override { aeronPolling_[channelId] = true; while (aeronPolling_[channelId]) { const int fragmentsRead = aeronSubscriptions_[channelId]->poll(*aeronHandler_, 10); - if(aeronIdleStrategy_->idle(fragmentsRead) != 0) { + if(aeronIdleStrategy_->idle(fragmentsRead, timeout) != 0) { aeronIdleStrategy_->reset(); return {}; // Error: Aeron message wait timed out } @@ -176,4 +184,4 @@ class AeronClient final : public ClientInterface { std::unordered_map aeronMessages_ {}; }; -} \ No newline at end of file +} diff --git a/include/bringauto/async_function_execution/clients/ClientInterface.hpp b/include/bringauto/async_function_execution/clients/ClientInterface.hpp index 23b9ca5..a0f7c92 100644 --- a/include/bringauto/async_function_execution/clients/ClientInterface.hpp +++ b/include/bringauto/async_function_execution/clients/ClientInterface.hpp @@ -3,6 +3,7 @@ #include #include #include +#include @@ -35,9 +36,10 @@ class ClientInterface { * @brief Waits for a message from the specified channel ID. * * @param channelId The channel ID to wait for a message from. + * @param timeout Maximum time to wait for a message before timing out. * @return Bytes of the last message received. Returns an empty span on timeout or error. */ - virtual std::span waitForMessage(const uint32_t channelId) = 0; + virtual std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout) = 0; /** * @brief Waits for any message from all channels. diff --git a/include/bringauto/async_function_execution/structures/Settings.hpp b/include/bringauto/async_function_execution/structures/Settings.hpp index 11a270e..9b1a654 100644 --- a/include/bringauto/async_function_execution/structures/Settings.hpp +++ b/include/bringauto/async_function_execution/structures/Settings.hpp @@ -1,7 +1,5 @@ #pragma once -#include - #include #include @@ -17,6 +15,16 @@ struct FunctionConfig { std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0); }; +/** + * @brief Wrapper for multiple FunctionConfig instances, keyed by FunctionId. + * configs: Map of FunctionId to FunctionConfig. + */ +struct FunctionConfigs { + std::unordered_map configs; + FunctionConfigs() = default; + FunctionConfigs(std::unordered_map configs) : configs(std::move(configs)) {}; +}; + /** * @brief Configuration settings for the AsyncFunctionExecutor. * isProducer: If true, the instance acts as a producer (sending requests). @@ -27,27 +35,11 @@ struct FunctionConfig { struct Settings { const bool isProducer = true; const std::chrono::nanoseconds defaultTimeout = std::chrono::nanoseconds(0); - std::unordered_map functionConfigs; - - Settings(bool isProducer, std::chrono::nanoseconds defaultTimeout, std::string_view funcConfs = "") - : isProducer(isProducer), defaultTimeout(defaultTimeout) { - if (funcConfs.empty()) { - return; - } - - const auto configs = nlohmann::json::parse(funcConfs, nullptr, false); - for(const auto& [key, value] : configs.items()) { - try { - uint8_t funcId = static_cast(std::stoi(key)); - FunctionConfig funcConfig { - .timeout = std::chrono::nanoseconds(value["timeout"].get()) - }; - functionConfigs[funcId] = funcConfig; - } catch (const std::exception &e) { - std::cerr << "Error parsing function configuration for key " << key << ": " << e.what() << std::endl; - } - } - } + FunctionConfigs functionConfigs; + + Settings(bool isProducer, std::chrono::nanoseconds defaultTimeout, + const FunctionConfigs& functionConfigs = {}) + : isProducer(isProducer), defaultTimeout(defaultTimeout), functionConfigs(functionConfigs) {} }; } diff --git a/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp b/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp new file mode 100644 index 0000000..e16ec0b --- /dev/null +++ b/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp @@ -0,0 +1,65 @@ +#include + + + +namespace bringauto::async_function_execution { + +int TimeoutIdleStrategy::idle(const int workCount, std::chrono::nanoseconds timeout) { + if (workCount > 0) { + reset(); + return 0; + } + return idle(timeout); +} + +void TimeoutIdleStrategy::reset() { + parkPeriodNs_ = minParkPeriodNs_; + state_ = IdleState::NOT_IDLE; + startTime_ = std::chrono::steady_clock::time_point(); +} + +int TimeoutIdleStrategy::idle(std::chrono::nanoseconds timeout) { + auto timeoutNs = timeout != std::chrono::nanoseconds(0) ? timeout : timeoutNs_; + auto now = std::chrono::steady_clock::now(); + if (timeoutNs != std::chrono::nanoseconds(0)) { + if (startTime_ == std::chrono::steady_clock::time_point()) { + startTime_ = now; + } + if (std::chrono::duration_cast(now - startTime_) >= timeoutNs) { + return -1; + } + } + + switch(state_) { + case IdleState::NOT_IDLE: + state_ = IdleState::SPINNING; + lastStateSwitchTime_ = now; + break; + + case IdleState::SPINNING: + aeron::concurrent::atomic::cpu_pause(); + if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxSpinPeriodNs_) { + state_ = IdleState::YIELDING; + lastStateSwitchTime_ = now; + } + break; + + case IdleState::YIELDING: + std::this_thread::yield(); + if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxYieldPeriodNs_) { + state_ = IdleState::PARKING; + parkPeriodNs_ = minParkPeriodNs_; + lastStateSwitchTime_ = now; + } + break; + + case IdleState::PARKING: + default: + std::this_thread::sleep_for(parkPeriodNs_); + parkPeriodNs_ = std::min(parkPeriodNs_ * 2, maxParkPeriodNs_); + break; + } + return 0; +} + +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index faf9a99..9f83660 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,8 +3,8 @@ PROJECT(async-function-execution) SET(CMAKE_CXX_STANDARD 20) -IF(CMAKE_BUILD_TYPE STREQUAL "Debug") - ADD_DEFINITIONS(-DDEBUG) +IF(NOT TARGET async-function-execution-shared) + MESSAGE(FATAL_ERROR "The async-function-execution-shared target was not found. Please build the tests as part of the async-function-execution-shared project.") ENDIF() # GTest static library is not compiled as pic diff --git a/test/README.md b/test/README.md index 71c3e88..6f5e9a4 100644 --- a/test/README.md +++ b/test/README.md @@ -6,9 +6,11 @@ ## Build +Tests are built as part of the main project. Don't use the CMakeList in the test folder. + ```bash mkdir -p _build_tests && cd _build_tests -cmake ../ -DCMLIB_DIR= -DBRINGAUTO_TESTS=ON -DCMAKE_PREFIX_PATH= +cmake ../ -DBRINGAUTO_TESTS=ON make ``` diff --git a/test/include/AsyncFunctionExecutorTests.hpp b/test/include/AsyncFunctionExecutorTests.hpp index 97cadc6..9936737 100644 --- a/test/include/AsyncFunctionExecutorTests.hpp +++ b/test/include/AsyncFunctionExecutorTests.hpp @@ -55,38 +55,36 @@ baafe::FunctionDefinition FunctionNoArgs { baafe::AsyncFunctionExecutor executorProducer { baafe::Config { .isProducer = true, - .defaultTimeout = std::chrono::seconds(1) + .defaultTimeout = std::chrono::seconds(1), + .functionConfigurations = baafe::structures::FunctionConfigs { { + { 1, { std::chrono::nanoseconds(1000000) } }, + { 2, { std::chrono::nanoseconds(2000000) } }, + { 3, { std::chrono::nanoseconds(3000000) } }, + { 4, { std::chrono::nanoseconds(4000000) } }, + { 5, { std::chrono::nanoseconds(5000000) } } + } } }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd, FunctionMultiply, FunctionReturnSame, FunctionReturnSameString, FunctionNoArgs - } }, + }, std::make_unique() }; baafe::AsyncFunctionExecutor executorConsumer { baafe::Config { - .isProducer = false, - .functionConfigurations = R"( - { - "1": { "timeout": 1000000 }, - "2": { "timeout": 2000000 }, - "3": { "timeout": 3000000 }, - "4": { "timeout": 4000000 }, - "5": { "timeout": 5000000 } - } - )" + .isProducer = false }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd, FunctionMultiply, FunctionReturnSame, FunctionReturnSameString, FunctionNoArgs - } }, + }, std::make_unique() }; diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index a7ae425..5b3bd9d 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -77,8 +77,10 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; }; - std::span waitForMessage(const uint32_t channelId) override { - (void)channelId; + std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout) override { + // Test if the timeout is correctly set for each function + EXPECT_EQ((channelId - 1000) * 1000000, timeout.count()); + if (messageBuffer_.empty()) { return {}; } diff --git a/test/integration_tests/main_segfault_consumer.cpp b/test/integration_tests/main_segfault_consumer.cpp index 8b5ec35..9e0ad54 100644 --- a/test/integration_tests/main_segfault_consumer.cpp +++ b/test/integration_tests/main_segfault_consumer.cpp @@ -29,9 +29,9 @@ baafe::AsyncFunctionExecutor executorConsumer { baafe::Config { .isProducer = false }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { Function - } } + } }; @@ -57,4 +57,4 @@ int main() { } return 0; -} \ No newline at end of file +} diff --git a/test/integration_tests/main_segfault_producer.cpp b/test/integration_tests/main_segfault_producer.cpp index 0cdfc1e..e454932 100644 --- a/test/integration_tests/main_segfault_producer.cpp +++ b/test/integration_tests/main_segfault_producer.cpp @@ -30,9 +30,9 @@ baafe::AsyncFunctionExecutor executorProducer { .isProducer = true, .defaultTimeout = std::chrono::seconds(1) }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { Function - } } + } }; int main() { @@ -74,4 +74,4 @@ int main() { std::cout << "Aeron Driver has been stopped." << std::endl; std::cout << "Test completed." << std::endl; return 0; -} \ No newline at end of file +} diff --git a/test/integration_tests/main_simple_function.cpp b/test/integration_tests/main_simple_function.cpp index 053d0ce..06f0710 100644 --- a/test/integration_tests/main_simple_function.cpp +++ b/test/integration_tests/main_simple_function.cpp @@ -17,18 +17,18 @@ baafe::AsyncFunctionExecutor executorProducer { .isProducer = true, .defaultTimeout = std::chrono::seconds(1) }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd - } } + } }; baafe::AsyncFunctionExecutor executorConsumer { baafe::Config { .isProducer = false }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd - } } + } }; void consumerLoop() { @@ -88,4 +88,4 @@ int main() { std::cout << "Aeron Driver has been stopped." << std::endl; std::cout << "Test completed." << std::endl; return 0; -} \ No newline at end of file +} diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index 822069d..9a58768 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -115,16 +115,14 @@ TEST_F(AsyncFunctionExecutorTests, ConfigForUndefinedFunction) { ASSERT_THROW(baafe::AsyncFunctionExecutor( baafe::Config { .isProducer = true, - .functionConfigurations = R"( - { - "99": { "timeout": 1000000 } - } - )" + .functionConfigurations = baafe::structures::FunctionConfigs { { + { 99, { std::chrono::nanoseconds(1000000) } } + } } }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd, FunctionMultiply - } }, + }, std::make_unique() ), std::runtime_error); } From 5dca83fd0b6d4e45a18dc74479e2285f0f9d473c Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Thu, 2 Oct 2025 14:57:18 +0200 Subject: [PATCH 02/10] coderabbit suggestions --- README.md | 5 +- examples/README.md | 2 +- examples/main_consumer.cpp | 16 +++--- examples/main_producer.cpp | 7 ++- .../async_function_execution/AeronDriver.hpp | 4 +- .../AsyncFunctionExecutor.hpp | 32 ++++++------ .../TimeoutIdleStrategy.hpp | 17 +++--- .../clients/AeronClient.hpp | 26 ++++++++-- .../clients/ClientInterface.hpp | 5 +- .../structures/Settings.hpp | 4 +- .../async_function_execution/AeronDriver.cpp | 6 +-- .../TimeoutIdleStrategy.cpp | 4 ++ test/README.md | 2 +- test/include/AsyncFunctionExecutorTests.hpp | 2 +- test/include/MockClient.hpp | 52 +++++++++---------- .../main_segfault_consumer.cpp | 8 ++- .../main_segfault_producer.cpp | 14 +++-- .../main_simple_function.cpp | 10 +++- test/source/AsyncFunctionExecutorTests.cpp | 10 ++++ 19 files changed, 137 insertions(+), 89 deletions(-) diff --git a/README.md b/README.md index b467fd3..a108fa6 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ AsyncFunctionExecutor executorProducer { Config { .isProducer = true, // decides the mode of the executor .defaultTimeout = std::chrono::seconds(1) // polling timeout (should only be used when producer) - .functionConfigurations = structures::FuntionConfigs { { + .functionConfigurations = structures::FunctionConfigs { { { 1, { std::chrono::seconds(2) }} } } }, @@ -36,7 +36,8 @@ AsyncFunctionExecutor executorProducer { Before using any functions, connection needs to be established using the connect function: ```cpp -executorProducer.connect(); +// Returns -1 on a failed connection +int rc = executorProducer.connect(); ``` #### functionConfigurations diff --git a/examples/README.md b/examples/README.md index c86b67a..0e6ab59 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,7 +4,7 @@ This folder contains a simple example of the usage of this project. 3 executable ## Build -Examples are built as part of the main project. Don't use the CMakeList in the test folder. +Examples are built as part of the main project. Don't use the CMakeLists file in the examples folder. ```bash mkdir -p _build_example && cd _build_example diff --git a/examples/main_consumer.cpp b/examples/main_consumer.cpp index 4576d77..9867b10 100644 --- a/examples/main_consumer.cpp +++ b/examples/main_consumer.cpp @@ -8,7 +8,7 @@ using namespace bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -45,8 +45,11 @@ int main() { FunctionList { ExampleFunc1, ExampleFunc2, ExampleFunc3 }, }; - executor.connect(); - + if (executor.connect() != 0) { + std::cerr << "Consumer: Failed to connect to executor" << std::endl; + return 1; + } + while (true) { auto [funcId, argBytes] = executor.pollFunction(); @@ -54,20 +57,19 @@ int main() { case 1: { auto [arg1, arg2, arg3] = executor.getFunctionArgs(ExampleFunc1, argBytes); std::cout << "Consumer: Received Function 1 call with args (" << arg1 << ", " << arg2.value << ", " << arg3 << ")." << std::endl; - executor.sendReturnMessage(funcId, "Func 1 return value"); + executor.sendReturnMessage(funcId, SerializableString{"Func 1 return value"}); break; } case 2: { auto [arg1, arg2] = executor.getFunctionArgs(ExampleFunc2, argBytes); std::cout << "Consumer: Received Function 2 call with args (" << arg1 << ", " << arg2.value << ")." << std::endl; - executor.sendReturnMessage(funcId, "Func 2 return value"); + executor.sendReturnMessage(funcId, SerializableString{"Func 2 return value"}); break; } case 3: { auto [arg1] = executor.getFunctionArgs(ExampleFunc3, argBytes); - SerializableString returnValue { "Result for input " + std::to_string(arg1) }; std::cout << "Consumer: Received Function 3 call with args (" << arg1 << ")" << std::endl; - executor.sendReturnMessage(funcId, "Func 3 return value"); + executor.sendReturnMessage(funcId, SerializableString{"Func 3 return value"}); break; } default: diff --git a/examples/main_producer.cpp b/examples/main_producer.cpp index c605f90..8c9943b 100644 --- a/examples/main_producer.cpp +++ b/examples/main_producer.cpp @@ -6,7 +6,7 @@ using namespace bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -44,7 +44,10 @@ int main() { FunctionList { ExampleFunc1, ExampleFunc2, ExampleFunc3 }, }; - executor.connect(); + if (executor.connect() != 0) { + std::cerr << "Producer: Failed to connect to executor" << std::endl; + return 1; + } auto result1 = executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f); std::cout << result1.value << std::endl; diff --git a/include/bringauto/async_function_execution/AeronDriver.hpp b/include/bringauto/async_function_execution/AeronDriver.hpp index df553bf..78100ea 100644 --- a/include/bringauto/async_function_execution/AeronDriver.hpp +++ b/include/bringauto/async_function_execution/AeronDriver.hpp @@ -24,13 +24,13 @@ class AeronDriver { * @brief Starts the Aeron Driver. * This method will block until the driver is stopped or an error occurs. */ - void run(); + void run() const; /** * @brief Checks if the Aeron Driver is running. * @return true if the driver is running, false otherwise. */ - bool isRunning() const; + [[nodiscard]] bool isRunning() const; /** * @brief Stops the Aeron Driver. diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 1523908..c4d1ac9 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -48,7 +48,7 @@ concept HasSerialize = requires(const T& t) { template struct Return { const T value; - constexpr Return(T &&val) : value(std::forward(val)) {} + constexpr explicit Return(T &&val) : value(std::forward(val)) {} }; @@ -57,7 +57,7 @@ struct Return { */ template<> struct Return { - constexpr Return() noexcept {} + constexpr Return() noexcept = default; }; @@ -68,7 +68,7 @@ struct Return { template struct Arguments { const std::tuple values; - constexpr Arguments(Args &&...args) : values{std::forward(args)...} {} + constexpr explicit Arguments(Args &&...args) : values{std::forward(args)...} {} }; @@ -115,7 +115,7 @@ concept IsFunctionDefinition = requires { typename std::decay_t; } && template struct FunctionList { const std::tuple functions; - FunctionList(Funcs... funcs) : functions(std::move(funcs)...) {} + explicit FunctionList(Funcs... funcs) : functions(std::move(funcs)...) {} }; @@ -140,7 +140,7 @@ class AsyncFunctionExecutor { * @param functions A list of function definitions that the client can call or respond to. * @param client Optional custom client implementing ClientInterface. If not provided, a default AeronClient is used. */ - AsyncFunctionExecutor(Config config, + AsyncFunctionExecutor(const Config& config, const FunctionList &functions, std::unique_ptr client = nullptr) : client_(nullptr), settings_(config.isProducer, config.defaultTimeout, config.functionConfigurations), functions_(functions) { @@ -153,9 +153,9 @@ class AsyncFunctionExecutor { ); } - for (const auto& [funcId, _] : settings_.functionConfigs.configs) { + for (const auto &funcId: settings_.functionConfigs.configs | std::views::keys) { if (!isFunctionDefined(FunctionId{funcId})) { - throw std::runtime_error("Warning: Function ID " + std::to_string(static_cast(funcId)) + " in configuration is not defined in FunctionList."); + throw std::runtime_error("Warning: Function ID " + std::to_string(funcId) + " in configuration is not defined in FunctionList."); } } }; @@ -240,7 +240,7 @@ class AsyncFunctionExecutor { return std::make_tuple(FunctionId{}, std::span{}); // Error: Cannot start polling in producer mode } - auto requestBytes = client_->waitForAnyMessage(); + const auto requestBytes = client_->waitForAnyMessage(); if (requestBytes.empty()) { return std::make_tuple(FunctionId{}, std::span{}); // Error: No message received or timeout } @@ -268,7 +268,7 @@ class AsyncFunctionExecutor { throw std::runtime_error("Function ID not defined"); } - if (argBytes.size() < 1) { + if (argBytes.empty()) { throw std::invalid_argument("Not enough data to read argument count"); } @@ -282,7 +282,7 @@ class AsyncFunctionExecutor { std::tuple args; auto extractArg = [&](auto &arg) { if (pos >= argBytes.size()) throw std::invalid_argument("Unexpected end of data while reading argument size"); - uint16_t len = argBytes[pos] | (static_cast(argBytes[pos + 1]) << 8); + const uint16_t len = argBytes[pos] | (static_cast(argBytes[pos + 1]) << 8); pos += 2; if (pos + len > argBytes.size()) throw std::invalid_argument("Unexpected end of data while reading argument content"); @@ -352,7 +352,7 @@ class AsyncFunctionExecutor { template std::span serializeReturn(const FunctionId &funcId, const T &returnValue) { serializationBuffer_.clear(); - std::size_t totalSize = 3 + sizeof(returnValue); + const std::size_t totalSize = 3 + sizeof(returnValue); serializationBuffer_.reserve(totalSize); serializationBuffer_.push_back(funcId.value); appendArg(serializationBuffer_, returnValue); @@ -367,15 +367,15 @@ class AsyncFunctionExecutor { if (bytes.size() > MAX_ARGUMENT_SIZE) { throw std::invalid_argument("Serialized data too large"); } - uint16_t size = static_cast(bytes.size()); + const auto size = static_cast(bytes.size()); buffer.push_back(static_cast(size & 0xFF)); - buffer.push_back(static_cast((size >> 8) & 0xFF)); + buffer.push_back(static_cast(size >> 8 & 0xFF)); buffer.insert(buffer.end(), bytes.begin(), bytes.end()); } else { static_assert(std::is_trivially_copyable_v, "Argument type must be trivially copyable"); - uint16_t size = static_cast(sizeof(arg)); + const auto size = static_cast(sizeof(arg)); buffer.push_back(static_cast(size & 0xFF)); - buffer.push_back(static_cast((size >> 8) & 0xFF)); + buffer.push_back(static_cast(size >> 8 & 0xFF)); buffer.insert(buffer.end(), reinterpret_cast(&arg), reinterpret_cast(&arg) + sizeof(T)); } } @@ -398,7 +398,7 @@ class AsyncFunctionExecutor { std::tuple> deserializeRequest(const std::span& bytes) { - if (bytes.size() < 1) { + if (bytes.empty()) { throw std::invalid_argument("Not enough data to deserialize request"); } FunctionId funcId{ bytes[0] }; diff --git a/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp b/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp index 63efa5f..46f4815 100644 --- a/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp +++ b/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp @@ -1,11 +1,6 @@ #pragma once -#include -#include - -#include #include -#include @@ -23,11 +18,11 @@ class TimeoutIdleStrategy { * @param maxParkPeriodNs Maximum duration to sleep in the PARKING state. Default is 1 millisecond. */ explicit TimeoutIdleStrategy( - std::chrono::nanoseconds timeoutNs = std::chrono::nanoseconds(0), - std::chrono::nanoseconds maxSpinPeriodNs = std::chrono::duration(1000), - std::chrono::nanoseconds maxYieldPeriodNs = std::chrono::duration(2000), - std::chrono::nanoseconds minParkPeriodNs = std::chrono::nanoseconds(1000), - std::chrono::nanoseconds maxParkPeriodNs = std::chrono::duration(1) + const std::chrono::nanoseconds timeoutNs = std::chrono::nanoseconds(0), + const std::chrono::nanoseconds maxSpinPeriodNs = std::chrono::duration(1000), + const std::chrono::nanoseconds maxYieldPeriodNs = std::chrono::duration(2000), + const std::chrono::nanoseconds minParkPeriodNs = std::chrono::nanoseconds(1000), + const std::chrono::nanoseconds maxParkPeriodNs = std::chrono::duration(1) ) : maxSpinPeriodNs_(maxSpinPeriodNs), maxYieldPeriodNs_(maxYieldPeriodNs), minParkPeriodNs_(minParkPeriodNs), @@ -44,7 +39,7 @@ class TimeoutIdleStrategy { * configuration will be used. * @return 0 if idling continues, -1 if the timeout has been reached. */ - int idle(const int workCount, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)); + int idle(int workCount, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)); /** * @brief Resets the idle strategy to its initial state. diff --git a/include/bringauto/async_function_execution/clients/AeronClient.hpp b/include/bringauto/async_function_execution/clients/AeronClient.hpp index 0aedc49..492e58c 100644 --- a/include/bringauto/async_function_execution/clients/AeronClient.hpp +++ b/include/bringauto/async_function_execution/clients/AeronClient.hpp @@ -70,7 +70,12 @@ class AeronClient final : public ClientInterface { for (const auto &pubId : publicationIds) { id = aeron_->addPublication(aeronConnection_, pubId); std::shared_ptr publication = aeron_->findPublication(id); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); while (!publication) { + if (std::chrono::steady_clock::now() > deadline) { + std::cerr << "Aeron connection error: Timeout while waiting for publication" << std::endl; + return -1; // Error: Timeout + } std::this_thread::yield(); publication = aeron_->findPublication(id); } @@ -80,7 +85,12 @@ class AeronClient final : public ClientInterface { for (const auto &subId : subscriptionIds) { id = aeron_->addSubscription(aeronConnection_, subId); std::shared_ptr subscription = aeron_->findSubscription(id); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); while (!subscription) { + if (std::chrono::steady_clock::now() > deadline) { + std::cerr << "Aeron connection error: Timeout while waiting for subscription" << std::endl; + return -1; // Error: Timeout + } std::this_thread::yield(); subscription = aeron_->findSubscription(id); } @@ -109,8 +119,13 @@ class AeronClient final : public ClientInterface { * (NOT_CONNECTED = -1, BACK_PRESSURED = -2, ADMIN_ACTION = -3, PUBLICATION_CLOSED = -4) */ int sendMessage(const uint32_t channelId, std::span &messageBytes) override { - aeron::concurrent::AtomicBuffer srcBuffer(const_cast(messageBytes.data()), messageBytes.size()); - return aeronPublications_[channelId]->offer(srcBuffer, 0, messageBytes.size()); + const aeron::concurrent::AtomicBuffer srcBuffer(const_cast(messageBytes.data()), messageBytes.size()); + const auto it = aeronPublications_.find(channelId); + if (it == aeronPublications_.end()) { + std::cerr << "Aeron send error: Channel ID not found" << std::endl; + return -1; // Error: Channel ID not found + } + return it->second->offer(srcBuffer, 0, messageBytes.size()); }; @@ -122,9 +137,14 @@ class AeronClient final : public ClientInterface { * @return Bytes of the last message received. Returns an empty span on timeout or error. */ std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)) override { + const auto it = aeronSubscriptions_.find(channelId); + if (it == aeronSubscriptions_.end()) { + std::cerr << "Aeron wait error: Channel ID not found" << std::endl; + return {}; // Error: Channel ID not found + } aeronPolling_[channelId] = true; while (aeronPolling_[channelId]) { - const int fragmentsRead = aeronSubscriptions_[channelId]->poll(*aeronHandler_, 10); + const int fragmentsRead = it->second->poll(*aeronHandler_, 10); if(aeronIdleStrategy_->idle(fragmentsRead, timeout) != 0) { aeronIdleStrategy_->reset(); return {}; // Error: Aeron message wait timed out diff --git a/include/bringauto/async_function_execution/clients/ClientInterface.hpp b/include/bringauto/async_function_execution/clients/ClientInterface.hpp index a0f7c92..e7c4f0e 100644 --- a/include/bringauto/async_function_execution/clients/ClientInterface.hpp +++ b/include/bringauto/async_function_execution/clients/ClientInterface.hpp @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -30,7 +29,7 @@ class ClientInterface { * @param messageBytes The bytes of the message to send. * @return Returns 0 on success, or a negative error code on failure. */ - virtual int sendMessage(const uint32_t channelId, std::span &messageBytes) = 0; + virtual int sendMessage(uint32_t channelId, std::span &messageBytes) = 0; /** * @brief Waits for a message from the specified channel ID. @@ -39,7 +38,7 @@ class ClientInterface { * @param timeout Maximum time to wait for a message before timing out. * @return Bytes of the last message received. Returns an empty span on timeout or error. */ - virtual std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout) = 0; + virtual std::span waitForMessage(uint32_t channelId, std::chrono::nanoseconds timeout) = 0; /** * @brief Waits for any message from all channels. diff --git a/include/bringauto/async_function_execution/structures/Settings.hpp b/include/bringauto/async_function_execution/structures/Settings.hpp index 9b1a654..da59e33 100644 --- a/include/bringauto/async_function_execution/structures/Settings.hpp +++ b/include/bringauto/async_function_execution/structures/Settings.hpp @@ -22,7 +22,7 @@ struct FunctionConfig { struct FunctionConfigs { std::unordered_map configs; FunctionConfigs() = default; - FunctionConfigs(std::unordered_map configs) : configs(std::move(configs)) {}; + explicit FunctionConfigs(std::unordered_map configs) : configs(std::move(configs)) {}; }; /** @@ -37,7 +37,7 @@ struct Settings { const std::chrono::nanoseconds defaultTimeout = std::chrono::nanoseconds(0); FunctionConfigs functionConfigs; - Settings(bool isProducer, std::chrono::nanoseconds defaultTimeout, + Settings(const bool isProducer, const std::chrono::nanoseconds defaultTimeout, const FunctionConfigs& functionConfigs = {}) : isProducer(isProducer), defaultTimeout(defaultTimeout), functionConfigs(functionConfigs) {} }; diff --git a/source/bringauto/async_function_execution/AeronDriver.cpp b/source/bringauto/async_function_execution/AeronDriver.cpp index 4ab869f..e622c76 100644 --- a/source/bringauto/async_function_execution/AeronDriver.cpp +++ b/source/bringauto/async_function_execution/AeronDriver.cpp @@ -1,6 +1,6 @@ #include -#include +#include @@ -26,7 +26,7 @@ AeronDriver::AeronDriver() { signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); aeron_driver_context_init(&driverContext_); - aeron_driver_context_set_driver_termination_hook(driverContext_, terminationHook, NULL); + aeron_driver_context_set_driver_termination_hook(driverContext_, terminationHook, nullptr); driverContext_->agent_on_start_func_delegate = driverContext_->agent_on_start_func; driverContext_->agent_on_start_state_delegate = driverContext_->agent_on_start_state; aeron_driver_context_set_agent_on_start_function(driverContext_, aeron_set_thread_affinity_on_start, driverContext_); @@ -34,7 +34,7 @@ AeronDriver::AeronDriver() { } -void AeronDriver::run() { +void AeronDriver::run() const { aeron_driver_start(driver_, true); while (isRunning()) { aeron_driver_main_idle_strategy(driver_, aeron_driver_main_do_work(driver_)); diff --git a/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp b/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp index e16ec0b..96ea857 100644 --- a/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp +++ b/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp @@ -1,5 +1,9 @@ #include +#include + +#include + namespace bringauto::async_function_execution { diff --git a/test/README.md b/test/README.md index 6f5e9a4..864634a 100644 --- a/test/README.md +++ b/test/README.md @@ -6,7 +6,7 @@ ## Build -Tests are built as part of the main project. Don't use the CMakeList in the test folder. +Tests are built as part of the main project. Don't use the CMakeLists file in the test folder. ```bash mkdir -p _build_tests && cd _build_tests diff --git a/test/include/AsyncFunctionExecutorTests.hpp b/test/include/AsyncFunctionExecutorTests.hpp index 9936737..44561fc 100644 --- a/test/include/AsyncFunctionExecutorTests.hpp +++ b/test/include/AsyncFunctionExecutorTests.hpp @@ -12,7 +12,7 @@ namespace baafe = bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index 5b3bd9d..1ba0a37 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -9,7 +9,7 @@ -class MockClient : public bringauto::async_function_execution::clients::ClientInterface { +class MockClient final : public bringauto::async_function_execution::clients::ClientInterface { public: MockClient() = default; ~MockClient() = default; @@ -42,7 +42,7 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; } - uint8_t funcId = messageBytes[0]; + const uint8_t funcId = messageBytes[0]; if (funcId == 4) { // FunctionReturnSameString auto stringArgs = deserializeStringRequest(messageBytes); @@ -52,17 +52,17 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; } - auto args = deserializeIntRequest(messageBytes); + const auto args = deserializeIntRequest(messageBytes); switch (funcId) { case 1: // FunctionAdd if (args.size() == 3) { - int sum = args[0] + args[1] + args[2]; + const int sum = args[0] + args[1] + args[2]; serializeIntResponse(funcId, sum); } break; case 2: // FunctionMultiply if (args.size() == 3) { - int product = args[0] * args[1] * args[2]; + const int product = args[0] * args[1] * args[2]; serializeIntResponse(funcId, product); } break; @@ -77,46 +77,46 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; }; - std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout) override { + std::span waitForMessage(const uint32_t channelId, const std::chrono::nanoseconds timeout) override { // Test if the timeout is correctly set for each function EXPECT_EQ((channelId - 1000) * 1000000, timeout.count()); if (messageBuffer_.empty()) { return {}; } - return std::span(messageBuffer_.data(), messageBuffer_.size()); + return {messageBuffer_.data(), messageBuffer_.size()}; } /// Will always return a message for FunctionAdd with arguments (10, 20, 30) std::span waitForAnyMessage() override { messageBuffer_.clear(); messageBuffer_.reserve(2 + 3 * (2 + sizeof(int))); // Function ID + Arg count + 3 args (size + data) - messageBuffer_.push_back(static_cast(1)); // Function ID - messageBuffer_.push_back(static_cast(3)); // Argument count + messageBuffer_.push_back(1); // Function ID + messageBuffer_.push_back(3); // Argument count int arg1 = 10; int arg2 = 20; int arg3 = 30; - messageBuffer_.push_back(static_cast(sizeof(int) & 0xFF)); - messageBuffer_.push_back(static_cast((sizeof(int) >> 8) & 0xFF)); + messageBuffer_.push_back(sizeof(int) & 0xFF); + messageBuffer_.push_back(sizeof(int) >> 8 & 0xFF); messageBuffer_.insert(messageBuffer_.end(), reinterpret_cast(&arg1), reinterpret_cast(&arg1) + sizeof(int)); - messageBuffer_.push_back(static_cast(sizeof(int) & 0xFF)); - messageBuffer_.push_back(static_cast((sizeof(int) >> 8) & 0xFF)); + messageBuffer_.push_back(sizeof(int) & 0xFF); + messageBuffer_.push_back(sizeof(int) >> 8 & 0xFF); messageBuffer_.insert(messageBuffer_.end(), reinterpret_cast(&arg2), reinterpret_cast(&arg2) + sizeof(int)); - messageBuffer_.push_back(static_cast(sizeof(int) & 0xFF)); - messageBuffer_.push_back(static_cast((sizeof(int) >> 8) & 0xFF)); + messageBuffer_.push_back(sizeof(int) & 0xFF); + messageBuffer_.push_back(sizeof(int) >> 8 & 0xFF); messageBuffer_.insert(messageBuffer_.end(), reinterpret_cast(&arg3), reinterpret_cast(&arg3) + sizeof(int)); return {messageBuffer_.data(), messageBuffer_.size()}; } private: /// Deserializes a request message into function ID and argument values the same way that AsyncFunctionExecutor does. - std::vector deserializeIntRequest(std::span &bytes) { + std::vector deserializeIntRequest(const std::span &bytes) { size_t pos = 1; - uint8_t argCount = bytes[pos++]; + const uint8_t argCount = bytes[pos++]; std::vector args; for (uint8_t i = 0; i < argCount; ++i) { - uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); + const uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); pos += 2; int argValue; std::memcpy(&argValue, bytes.data() + pos, sizeof(int)); @@ -127,13 +127,13 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn } /// Deserializes a request message with string arguments. - std::vector deserializeStringRequest(std::span &bytes) { + std::vector deserializeStringRequest(const std::span &bytes) { size_t pos = 1; - uint8_t argCount = bytes[pos++]; + const uint8_t argCount = bytes[pos++]; std::vector args; for (uint8_t i = 0; i < argCount; ++i) { - uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); + const uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); pos += 2; std::string argValue(reinterpret_cast(bytes.data() + pos), argSize); args.push_back(argValue); @@ -143,25 +143,25 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn } /// Serializes a response message the same way that AsyncFunctionExecutor does. - void serializeIntResponse(uint8_t funcId, int returnValue) { + void serializeIntResponse(const uint8_t funcId, const int returnValue) { std::vector buffer; buffer.push_back(funcId); - buffer.push_back(static_cast(sizeof(int))); + buffer.push_back(sizeof(int)); buffer.resize(3 + sizeof(int)); std::memcpy(buffer.data() + 3, &returnValue, sizeof(int)); messageBuffer_ = buffer; } /// Serializes a response message with a string return value. - void serializeStringResponse(uint8_t funcId, const std::string &data) { + void serializeStringResponse(const uint8_t funcId, const std::string &data) { std::vector buffer; buffer.push_back(funcId); if (data.size() > 65535) { throw std::invalid_argument("Data too large to serialize in MockClient"); } - uint16_t size = static_cast(data.size()); + const auto size = static_cast(data.size()); buffer.push_back(static_cast(size & 0xFF)); - buffer.push_back(static_cast((size >> 8) & 0xFF)); + buffer.push_back(static_cast(size >> 8 & 0xFF)); buffer.insert(buffer.end(), data.begin(), data.end()); messageBuffer_ = buffer; } diff --git a/test/integration_tests/main_segfault_consumer.cpp b/test/integration_tests/main_segfault_consumer.cpp index 9e0ad54..40f7e1b 100644 --- a/test/integration_tests/main_segfault_consumer.cpp +++ b/test/integration_tests/main_segfault_consumer.cpp @@ -9,7 +9,7 @@ namespace baafe = bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -36,7 +36,11 @@ baafe::AsyncFunctionExecutor executorConsumer { int main() { - executorConsumer.connect(); + if (executorConsumer.connect() != 0) { + std::cerr << "Consumer: Failed to connect to executor" << std::endl; + return 1; + } + std::cout << "Consumer connected." << std::endl; while (true) { diff --git a/test/integration_tests/main_segfault_producer.cpp b/test/integration_tests/main_segfault_producer.cpp index e454932..23c4e2a 100644 --- a/test/integration_tests/main_segfault_producer.cpp +++ b/test/integration_tests/main_segfault_producer.cpp @@ -9,7 +9,8 @@ namespace baafe = bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -50,15 +51,18 @@ int main() { return -1; } - executorProducer.connect(); - std::cout << "Producer connected." << std::endl; + if (executorProducer.connect() != 0) { + std::cerr << "Producer: Failed to connect to executor" << std::endl; + return 1; + } + std::cout << "Producer connected." << std::endl; std::cout << "Turn on the consumer and press Enter to continue..." << std::endl; std::cin.get(); // Producer calls Function - SerializableString ret = executorProducer.callFunc(Function, "Hello, World!"); - + SerializableString ret = executorProducer.callFunc(Function, SerializableString{"Hello, World!"}); + // Short delay while a segfault is forced in the consumer std::this_thread::sleep_for(std::chrono::seconds(1)); diff --git a/test/integration_tests/main_simple_function.cpp b/test/integration_tests/main_simple_function.cpp index 06f0710..363cb68 100644 --- a/test/integration_tests/main_simple_function.cpp +++ b/test/integration_tests/main_simple_function.cpp @@ -62,8 +62,14 @@ int main() { return -1; } - executorProducer.connect(); - executorConsumer.connect(); + if (executorProducer.connect() != 0) { + std::cerr << "Producer: Failed to connect to executor" << std::endl; + return 1; + } + if (executorConsumer.connect() != 0) { + std::cerr << "Consumer: Failed to connect to executor" << std::endl; + return 1; + } std::cout << "Both producer and consumer connected." << std::endl; // Start consumer loop in a separate thread diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index 9a58768..f1b6363 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -126,3 +126,13 @@ TEST_F(AsyncFunctionExecutorTests, ConfigForUndefinedFunction) { std::make_unique() ), std::runtime_error); } + + +/** + * @brief Tests calling a function with argument size over 2^16 bytes. + */ +TEST_F(AsyncFunctionExecutorTests, ArgumentTooLarge) { + // Create a very large string argument + std::string largeString(70000, 'A'); + ASSERT_THROW(executorProducer.callFunc(FunctionReturnSameString, SerializableString{largeString}), std::invalid_argument); +} From 0bc5cad3b2d0399a7409387b290f31ff0a5ced8c Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Thu, 16 Oct 2025 11:48:05 +0200 Subject: [PATCH 03/10] use std::expected for callFunc return; return error on simultaneous callFunc calls --- CMakeLists.txt | 4 +- examples/CMakeLists.txt | 2 + examples/main_producer.cpp | 6 +- .../AsyncFunctionExecutor.hpp | 64 +++++++++++++------ test/CMakeLists.txt | 2 +- test/include/AsyncFunctionExecutorTests.hpp | 12 +++- test/include/MockClient.hpp | 12 ++++ .../main_segfault_producer.cpp | 2 +- .../main_simple_function.cpp | 2 +- test/source/AsyncFunctionExecutorTests.cpp | 30 +++++++-- 10 files changed, 101 insertions(+), 35 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5092016..5b529ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25 FATAL_ERROR) PROJECT(async-function-execution C CXX) SET(CMAKE_POSITION_INDEPENDENT_CODE ON) -SET(CMAKE_CXX_STANDARD 20) +SET(CMAKE_CXX_STANDARD 23) SET(ASYNC_FUNCTION_EXECUTION_VERSION 1.0.0) @@ -55,6 +55,8 @@ TARGET_LINK_LIBRARIES(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PUBLIC aeron::aeron_driver ) +target_compile_features(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PRIVATE cxx_std_23) + IF(BRINGAUTO_TESTS) ENABLE_TESTING() ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/test) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 93538c6..92e3fd7 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,5 +1,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25 FATAL_ERROR) +SET(CMAKE_CXX_STANDARD 23) + IF(NOT TARGET async-function-execution-shared) MESSAGE(FATAL_ERROR "The async-function-execution-shared target was not found. Please build the example as part of the async-function-execution-shared project.") ENDIF() diff --git a/examples/main_producer.cpp b/examples/main_producer.cpp index 8c9943b..d3f27f9 100644 --- a/examples/main_producer.cpp +++ b/examples/main_producer.cpp @@ -49,11 +49,11 @@ int main() { return 1; } - auto result1 = executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f); + auto result1 = executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f).value(); std::cout << result1.value << std::endl; - auto result2 = executor.callFunc(ExampleFunc2, 100, "World"); + auto result2 = executor.callFunc(ExampleFunc2, 100, "World").value(); std::cout << result2.value << std::endl; - auto result3 = executor.callFunc(ExampleFunc3, 123); + auto result3 = executor.callFunc(ExampleFunc3, 123).value(); std::cout << result3.value << std::endl; return 0; } diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index c4d1ac9..350f9b5 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -7,6 +7,7 @@ #include #include #include +#include @@ -126,6 +127,19 @@ template FunctionList(Funcs&&...) -> FunctionList...>; +/** + * @brief Enum class representing possible error states during function calls. + */ +enum class CallError { + InvalidExecutorType, + FunctionIdNotDefined, + ArgumentCountMismatch, + TimeoutOrNoResponse, + FunctionIdMismatch, + FunctionCallInProgress +}; + + /** * @brief This class provides a high-level interface for async communication, allowing function calls over shared memory * with serialization and deserialization. @@ -176,6 +190,7 @@ class AsyncFunctionExecutor { std::apply([&](const auto&... funcDefs) { (toProducer.push_back(funcDefs.id.value + MESSAGE_RETURN_CHANNEL_OFFSET), ...); (fromProducer.push_back(funcDefs.id.value), ...); + (callInProgress_.emplace(funcDefs.id.value, false), ...); }, functions_.functions); if (settings_.isProducer) { @@ -187,25 +202,29 @@ class AsyncFunctionExecutor { /** * @brief Calls a function defined in the FunctionList, sending arguments and waiting for a response. - * Can only be used in producer mode. Throws on error. + * Can only be used in producer mode. * * @param function The function definition of which function to call. * @param args The arguments to pass to the function. * @return The return value of the function. If the return type contains some byte buffer, - * the memory is valid until the next call to callFunc(). + * the memory is valid until the next call to callFunc(). On error, returns a CallError enum value. */ template - Ret callFunc(const FunctionDefinition &function, CallArgs&&... args) { + auto callFunc(const FunctionDefinition &function, CallArgs&&... args) -> std::expected { if (!settings_.isProducer) { - throw std::runtime_error("Cannot call function in consumer mode"); + return std::unexpected(CallError::InvalidExecutorType); } if (sizeof...(CallArgs) < sizeof...(FArgs)) { - throw std::invalid_argument("Argument count mismatch"); + return std::unexpected(CallError::ArgumentCountMismatch); } if (!isFunctionDefined(function.id)) { - throw std::runtime_error("Function ID not defined"); + return std::unexpected(CallError::FunctionIdNotDefined); } - + if (callInProgress_[function.id.value].load()) { + return std::unexpected(CallError::FunctionCallInProgress); + } + + callInProgress_[function.id.value].store(true); auto messageBytes = serializeArgs(function.id, args...); client_->sendMessage(function.id.value, messageBytes); @@ -215,13 +234,14 @@ class AsyncFunctionExecutor { settings_.defaultTimeout ); if (responseBytes.empty()) { - throw std::runtime_error("No response received or timeout"); + return std::unexpected(CallError::TimeoutOrNoResponse); } - - Ret response = deserializeReturn(function.id, responseBytes); + + auto response = deserializeReturn(function.id, responseBytes); + callInProgress_[function.id.value].store(false); if constexpr (std::is_same_v) { - return; + return {}; } else { return response; } @@ -382,18 +402,22 @@ class AsyncFunctionExecutor { template - T deserializeReturn(const FunctionId &funcId, const std::span& bytes) { + auto deserializeReturn(const FunctionId &funcId, const std::span& bytes) -> std::expected { if(funcId.value != bytes[0]) { - throw std::invalid_argument("Function ID mismatch in return value"); + return std::unexpected(CallError::FunctionIdMismatch); } - - T value; - if constexpr (HasSerialize) { - value.deserialize(std::span {bytes.data() + 3, bytes.size() - 3}); + + if constexpr (std::is_same_v) { + return {}; } else { - std::memcpy(&value, bytes.data() + 3, sizeof(T)); + T value; + if constexpr (HasSerialize) { + value.deserialize(std::span {bytes.data() + 3, bytes.size() - 3}); + } else { + std::memcpy(&value, bytes.data() + 3, sizeof(T)); + } + return value; } - return value; } @@ -413,6 +437,8 @@ class AsyncFunctionExecutor { std::unique_ptr client_; structures::Settings settings_; FunctionList functions_; + /// Map to track if a function call is in progress for a given function ID. + std::unordered_map callInProgress_{}; }; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9f83660..a825908 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,7 +1,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25) PROJECT(async-function-execution) -SET(CMAKE_CXX_STANDARD 20) +SET(CMAKE_CXX_STANDARD 23) IF(NOT TARGET async-function-execution-shared) MESSAGE(FATAL_ERROR "The async-function-execution-shared target was not found. Please build the tests as part of the async-function-execution-shared project.") diff --git a/test/include/AsyncFunctionExecutorTests.hpp b/test/include/AsyncFunctionExecutorTests.hpp index 44561fc..cd89a89 100644 --- a/test/include/AsyncFunctionExecutorTests.hpp +++ b/test/include/AsyncFunctionExecutorTests.hpp @@ -52,6 +52,12 @@ baafe::FunctionDefinition FunctionNoArgs { baafe::Arguments { } }; +baafe::FunctionDefinition FunctionWait { + baafe::FunctionId { 6 }, + baafe::Return { }, + baafe::Arguments { } +}; + baafe::AsyncFunctionExecutor executorProducer { baafe::Config { .isProducer = true, @@ -61,7 +67,8 @@ baafe::AsyncFunctionExecutor executorProducer { { 2, { std::chrono::nanoseconds(2000000) } }, { 3, { std::chrono::nanoseconds(3000000) } }, { 4, { std::chrono::nanoseconds(4000000) } }, - { 5, { std::chrono::nanoseconds(5000000) } } + { 5, { std::chrono::nanoseconds(5000000) } }, + { 6, { std::chrono::nanoseconds(6000000) } } } } }, baafe::FunctionList { @@ -69,7 +76,8 @@ baafe::AsyncFunctionExecutor executorProducer { FunctionMultiply, FunctionReturnSame, FunctionReturnSameString, - FunctionNoArgs + FunctionNoArgs, + FunctionWait }, std::make_unique() }; diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index 1ba0a37..fe4db6f 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -52,6 +52,11 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl return 0; } + if (funcId == 6) { // FunctionWait does not expect a response + messageBuffer_.clear(); + return 0; + } + const auto args = deserializeIntRequest(messageBytes); switch (funcId) { case 1: // FunctionAdd @@ -81,6 +86,13 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl // Test if the timeout is correctly set for each function EXPECT_EQ((channelId - 1000) * 1000000, timeout.count()); + if (channelId == 6) { + // Simulate a long wait for FunctionWait + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + messageBuffer_.clear(); // No message to return + return {}; + } + if (messageBuffer_.empty()) { return {}; } diff --git a/test/integration_tests/main_segfault_producer.cpp b/test/integration_tests/main_segfault_producer.cpp index 23c4e2a..bab7c82 100644 --- a/test/integration_tests/main_segfault_producer.cpp +++ b/test/integration_tests/main_segfault_producer.cpp @@ -61,7 +61,7 @@ int main() { std::cin.get(); // Producer calls Function - SerializableString ret = executorProducer.callFunc(Function, SerializableString{"Hello, World!"}); + SerializableString ret = executorProducer.callFunc(Function, SerializableString{"Hello, World!"}).value(); // Short delay while a segfault is forced in the consumer std::this_thread::sleep_for(std::chrono::seconds(1)); diff --git a/test/integration_tests/main_simple_function.cpp b/test/integration_tests/main_simple_function.cpp index 363cb68..1570372 100644 --- a/test/integration_tests/main_simple_function.cpp +++ b/test/integration_tests/main_simple_function.cpp @@ -77,7 +77,7 @@ int main() { std::this_thread::sleep_for(std::chrono::seconds(1)); // Give consumer a moment to start // Producer calls FunctionAdd - int sum = executorProducer.callFunc(FunctionAdd, 10, 20, 30); + int sum = executorProducer.callFunc(FunctionAdd, 10, 20, 30).value(); std::cout << "Producer: FunctionAdd(10, 20, 30) returned: " << sum << std::endl; if (sum != 60) { std::cerr << "Unexpected sum result!" << std::endl; diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index f1b6363..300e658 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -6,11 +6,11 @@ * @brief Tests calling different functions and receiving correct return values. */ TEST_F(AsyncFunctionExecutorTests, CallDifferentFunctions) { - auto result = executorProducer.callFunc(FunctionAdd, 1, 2, 3); + auto result = executorProducer.callFunc(FunctionAdd, 1, 2, 3).value(); ASSERT_EQ(result, 6); - result = executorProducer.callFunc(FunctionMultiply, 2, 3, 4); + result = executorProducer.callFunc(FunctionMultiply, 2, 3, 4).value(); ASSERT_EQ(result, 24); - result = executorProducer.callFunc(FunctionReturnSame, 42); + result = executorProducer.callFunc(FunctionReturnSame, 42).value(); ASSERT_EQ(result, 42); } @@ -19,7 +19,7 @@ TEST_F(AsyncFunctionExecutorTests, CallDifferentFunctions) { * @brief Tests calling a function with a serializable string argument and return value. */ TEST_F(AsyncFunctionExecutorTests, CallFunctionWithSerializableString) { - auto result = executorProducer.callFunc(FunctionReturnSameString, SerializableString{"Hello, World!"}); + auto result = executorProducer.callFunc(FunctionReturnSameString, SerializableString{"Hello, World!"}).value(); ASSERT_EQ(result.value, "Hello, World!"); } @@ -73,8 +73,8 @@ TEST_F(AsyncFunctionExecutorTests, GetFunctionArgsInvalidData) { * @brief Tests checks for producer/consumer mode restrictions and error handling. */ TEST_F(AsyncFunctionExecutorTests, CallInvalidFunctionsProducerConsumer) { - // Consumer using callFunc should throw - ASSERT_THROW(executorConsumer.callFunc(FunctionAdd, 1, 2, 3), std::runtime_error); + // Consumer using callFunc returns InvalidExecutorType error + ASSERT_EQ(executorConsumer.callFunc(FunctionAdd, 1, 2, 3).error(), baafe::CallError::InvalidExecutorType); // Producer using pollFunction should return empty tuple auto [funcId, argBytes] = executorProducer.pollFunction(); @@ -100,7 +100,7 @@ TEST_F(AsyncFunctionExecutorTests, UndefinedFunction) { baafe::Arguments { int {}, int {} } }; - ASSERT_THROW(executorProducer.callFunc(FunctionUndefined, 1, 2), std::runtime_error); + ASSERT_EQ(executorProducer.callFunc(FunctionUndefined, 1, 2).error(), baafe::CallError::FunctionIdNotDefined); ASSERT_THROW(executorConsumer.getFunctionArgs(FunctionUndefined, std::span {}), std::runtime_error); int ret = executorConsumer.sendReturnMessage(FunctionUndefined.id, 42); ASSERT_EQ(ret, -1); @@ -136,3 +136,19 @@ TEST_F(AsyncFunctionExecutorTests, ArgumentTooLarge) { std::string largeString(70000, 'A'); ASSERT_THROW(executorProducer.callFunc(FunctionReturnSameString, SerializableString{largeString}), std::invalid_argument); } + + +/** + * @brief Tests the behavior when a function call is already in progress. + */ +TEST_F(AsyncFunctionExecutorTests, FunctionCallInProgress) { + // Start a thread that calls FunctionWait, which does not expect a response + std::thread waitThread([this]() { + executorProducer.callFunc(FunctionWait); + }); + // Give the thread a moment to start and set the callInProgress flag + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // Now, calling FunctionWait again should return FunctionCallInProgress error + EXPECT_EQ(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); + waitThread.join(); +} From 67d16fb6e0cf925458fb7cd87f18e0ce23fc37a6 Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Thu, 16 Oct 2025 15:11:32 +0200 Subject: [PATCH 04/10] better handling of simultaneous callFunc calls --- .../AsyncFunctionExecutor.hpp | 14 +++++--------- test/include/MockClient.hpp | 18 +++++++++--------- test/source/AsyncFunctionExecutorTests.cpp | 1 + 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 350f9b5..6a32c21 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -224,7 +224,7 @@ class AsyncFunctionExecutor { return std::unexpected(CallError::FunctionCallInProgress); } - callInProgress_[function.id.value].store(true); + callInProgress_[function.id.value] = true; auto messageBytes = serializeArgs(function.id, args...); client_->sendMessage(function.id.value, messageBytes); @@ -234,17 +234,13 @@ class AsyncFunctionExecutor { settings_.defaultTimeout ); if (responseBytes.empty()) { + callInProgress_[function.id.value] = false; return std::unexpected(CallError::TimeoutOrNoResponse); } auto response = deserializeReturn(function.id, responseBytes); - callInProgress_[function.id.value].store(false); - - if constexpr (std::is_same_v) { - return {}; - } else { - return response; - } + callInProgress_[function.id.value] = false; + return response; } @@ -408,7 +404,7 @@ class AsyncFunctionExecutor { } if constexpr (std::is_same_v) { - return {}; + return std::expected{}; } else { T value; if constexpr (HasSerialize) { diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index fe4db6f..39d6e35 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -52,8 +52,9 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl return 0; } - if (funcId == 6) { // FunctionWait does not expect a response - messageBuffer_.clear(); + if (funcId == 6) { // FunctionWait + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + serializeVoidResponse(funcId); return 0; } @@ -86,13 +87,6 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl // Test if the timeout is correctly set for each function EXPECT_EQ((channelId - 1000) * 1000000, timeout.count()); - if (channelId == 6) { - // Simulate a long wait for FunctionWait - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - messageBuffer_.clear(); // No message to return - return {}; - } - if (messageBuffer_.empty()) { return {}; } @@ -178,6 +172,12 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl messageBuffer_ = buffer; } + /// Serializes a response message for a void return type. + void serializeVoidResponse(const uint8_t funcId) { + std::vector buffer; + buffer.push_back(funcId); + messageBuffer_ = buffer; + } std::vector messageBuffer_; }; diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index 300e658..428e95d 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -151,4 +151,5 @@ TEST_F(AsyncFunctionExecutorTests, FunctionCallInProgress) { // Now, calling FunctionWait again should return FunctionCallInProgress error EXPECT_EQ(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); waitThread.join(); + ASSERT_NE(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); } From 4d64622bcdb6b6d3bb67a6feb5307ed8751461ef Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Thu, 16 Oct 2025 15:23:37 +0200 Subject: [PATCH 05/10] remove poinless cmake line --- CMakeLists.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5b529ce..21f35f2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,8 +55,6 @@ TARGET_LINK_LIBRARIES(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PUBLIC aeron::aeron_driver ) -target_compile_features(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PRIVATE cxx_std_23) - IF(BRINGAUTO_TESTS) ENABLE_TESTING() ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/test) From 13f803b146fcf78d5ceddb23d3872c4365b9568b Mon Sep 17 00:00:00 2001 From: Jan Kubalek Date: Fri, 17 Oct 2025 15:08:42 +0200 Subject: [PATCH 06/10] Link against shared client --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 21f35f2..b77c682 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,7 +51,7 @@ CMDEF_ADD_LIBRARY( TARGET_LINK_LIBRARIES(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PUBLIC aeron::aeron - aeron::aeron_client + aeron::aeron_client_shared aeron::aeron_driver ) From 609e1fbaf839d37df3c36853776c974a1b5ac89b Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Thu, 30 Oct 2025 14:05:31 +0100 Subject: [PATCH 07/10] better constants --- examples/main_consumer.cpp | 16 +++++++---- .../AsyncFunctionExecutor.hpp | 28 +++++++------------ .../async_function_execution/Constants.hpp | 5 +++- .../clients/AeronClient.hpp | 4 +-- .../structures/Settings.hpp | 23 ++++++++++++++- test/include/AsyncFunctionExecutorTests.hpp | 12 ++++---- test/include/MockClient.hpp | 3 +- test/source/AsyncFunctionExecutorTests.cpp | 3 +- 8 files changed, 58 insertions(+), 36 deletions(-) diff --git a/examples/main_consumer.cpp b/examples/main_consumer.cpp index 9867b10..e0a65a4 100644 --- a/examples/main_consumer.cpp +++ b/examples/main_consumer.cpp @@ -18,20 +18,24 @@ struct SerializableString final { } }; +constexpr int ExampleFunc1Id = 1; +constexpr int ExampleFunc2Id = 2; +constexpr int ExampleFunc3Id = 3; + FunctionDefinition ExampleFunc1 { - FunctionId { 1 }, + FunctionId { ExampleFunc1Id }, Return { SerializableString {} }, Arguments { int {}, SerializableString {}, float {} } }; FunctionDefinition ExampleFunc2 { - FunctionId { 2 }, + FunctionId { ExampleFunc2Id }, Return { SerializableString {} }, Arguments { int {}, SerializableString {} } }; FunctionDefinition ExampleFunc3 { - FunctionId { 3 }, + FunctionId { ExampleFunc3Id }, Return { SerializableString {} }, Arguments { int {} } }; @@ -54,19 +58,19 @@ int main() { auto [funcId, argBytes] = executor.pollFunction(); switch (funcId.value) { - case 1: { + case ExampleFunc1Id: { auto [arg1, arg2, arg3] = executor.getFunctionArgs(ExampleFunc1, argBytes); std::cout << "Consumer: Received Function 1 call with args (" << arg1 << ", " << arg2.value << ", " << arg3 << ")." << std::endl; executor.sendReturnMessage(funcId, SerializableString{"Func 1 return value"}); break; } - case 2: { + case ExampleFunc2Id: { auto [arg1, arg2] = executor.getFunctionArgs(ExampleFunc2, argBytes); std::cout << "Consumer: Received Function 2 call with args (" << arg1 << ", " << arg2.value << ")." << std::endl; executor.sendReturnMessage(funcId, SerializableString{"Func 2 return value"}); break; } - case 3: { + case ExampleFunc3Id: { auto [arg1] = executor.getFunctionArgs(ExampleFunc3, argBytes); std::cout << "Consumer: Received Function 3 call with args (" << arg1 << ")" << std::endl; executor.sendReturnMessage(funcId, SerializableString{"Func 3 return value"}); diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 6a32c21..7d800a9 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -120,23 +120,16 @@ struct FunctionList { }; -/** - * @brief Deduction guide for FunctionList to simplify its construction. - */ -template -FunctionList(Funcs&&...) -> FunctionList...>; - - /** * @brief Enum class representing possible error states during function calls. */ enum class CallError { - InvalidExecutorType, - FunctionIdNotDefined, - ArgumentCountMismatch, - TimeoutOrNoResponse, - FunctionIdMismatch, - FunctionCallInProgress + InvalidExecutorType, // Called a producer-only function in consumer mode or vice versa + FunctionIdNotDefined, // Function ID is not defined in the FunctionList + ArgumentCountMismatch, // Number of expected arguments does not match + TimeoutOrNoResponse, // No response received within the timeout + FunctionIdMismatch, // Function ID does not match + FunctionCallInProgress // Function call is already in progress }; @@ -167,7 +160,7 @@ class AsyncFunctionExecutor { ); } - for (const auto &funcId: settings_.functionConfigs.configs | std::views::keys) { + for (const auto &funcId: settings_.functionConfigs.getFunctionIds()) { if (!isFunctionDefined(FunctionId{funcId})) { throw std::runtime_error("Warning: Function ID " + std::to_string(funcId) + " in configuration is not defined in FunctionList."); } @@ -228,11 +221,10 @@ class AsyncFunctionExecutor { auto messageBytes = serializeArgs(function.id, args...); client_->sendMessage(function.id.value, messageBytes); + auto timeout = settings_.functionConfigs.getConfig(function.id.value).timeout; auto responseBytes = client_->waitForMessage(function.id.value + MESSAGE_RETURN_CHANNEL_OFFSET, - settings_.functionConfigs.configs.contains(function.id.value) ? - settings_.functionConfigs.configs[function.id.value].timeout : - settings_.defaultTimeout - ); + timeout == std::chrono::nanoseconds(0) ? settings_.defaultTimeout : timeout); + if (responseBytes.empty()) { callInProgress_[function.id.value] = false; return std::unexpected(CallError::TimeoutOrNoResponse); diff --git a/include/bringauto/async_function_execution/Constants.hpp b/include/bringauto/async_function_execution/Constants.hpp index 78c5ad7..7c01fdf 100644 --- a/include/bringauto/async_function_execution/Constants.hpp +++ b/include/bringauto/async_function_execution/Constants.hpp @@ -15,5 +15,8 @@ constexpr int MAX_ARGUMENT_SIZE = 65535; /// Default Aeron connection string for communication over shared memory. constexpr std::string_view DEFAULT_AERON_CONNECTION = "aeron:ipc"; - + +/// Timeout duration for establishing stream connections. +constexpr std::chrono::milliseconds STREAM_CONNECTION_TIMEOUT = std::chrono::milliseconds(5000); + } diff --git a/include/bringauto/async_function_execution/clients/AeronClient.hpp b/include/bringauto/async_function_execution/clients/AeronClient.hpp index 492e58c..9de589d 100644 --- a/include/bringauto/async_function_execution/clients/AeronClient.hpp +++ b/include/bringauto/async_function_execution/clients/AeronClient.hpp @@ -70,7 +70,7 @@ class AeronClient final : public ClientInterface { for (const auto &pubId : publicationIds) { id = aeron_->addPublication(aeronConnection_, pubId); std::shared_ptr publication = aeron_->findPublication(id); - const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + const auto deadline = std::chrono::steady_clock::now() + STREAM_CONNECTION_TIMEOUT; while (!publication) { if (std::chrono::steady_clock::now() > deadline) { std::cerr << "Aeron connection error: Timeout while waiting for publication" << std::endl; @@ -85,7 +85,7 @@ class AeronClient final : public ClientInterface { for (const auto &subId : subscriptionIds) { id = aeron_->addSubscription(aeronConnection_, subId); std::shared_ptr subscription = aeron_->findSubscription(id); - const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + const auto deadline = std::chrono::steady_clock::now() + STREAM_CONNECTION_TIMEOUT; while (!subscription) { if (std::chrono::steady_clock::now() > deadline) { std::cerr << "Aeron connection error: Timeout while waiting for subscription" << std::endl; diff --git a/include/bringauto/async_function_execution/structures/Settings.hpp b/include/bringauto/async_function_execution/structures/Settings.hpp index da59e33..fd538f1 100644 --- a/include/bringauto/async_function_execution/structures/Settings.hpp +++ b/include/bringauto/async_function_execution/structures/Settings.hpp @@ -20,9 +20,30 @@ struct FunctionConfig { * configs: Map of FunctionId to FunctionConfig. */ struct FunctionConfigs { - std::unordered_map configs; FunctionConfigs() = default; explicit FunctionConfigs(std::unordered_map configs) : configs(std::move(configs)) {}; + + FunctionConfig getConfig(const uint8_t functionId) const { + if (configs.find(functionId) != configs.end()) { + return configs.at(functionId); + } + return FunctionConfig{}; + } + + void setConfig(const uint8_t functionId, const FunctionConfig& config) { + configs[functionId] = config; + } + + std::vector getFunctionIds() const { + std::vector functionIds; + for (const auto& [funcId, _] : configs) { + functionIds.push_back(funcId); + } + return functionIds; + } + + private: + std::unordered_map configs; }; /** diff --git a/test/include/AsyncFunctionExecutorTests.hpp b/test/include/AsyncFunctionExecutorTests.hpp index cd89a89..a26bd21 100644 --- a/test/include/AsyncFunctionExecutorTests.hpp +++ b/test/include/AsyncFunctionExecutorTests.hpp @@ -63,12 +63,12 @@ baafe::AsyncFunctionExecutor executorProducer { .isProducer = true, .defaultTimeout = std::chrono::seconds(1), .functionConfigurations = baafe::structures::FunctionConfigs { { - { 1, { std::chrono::nanoseconds(1000000) } }, - { 2, { std::chrono::nanoseconds(2000000) } }, - { 3, { std::chrono::nanoseconds(3000000) } }, - { 4, { std::chrono::nanoseconds(4000000) } }, - { 5, { std::chrono::nanoseconds(5000000) } }, - { 6, { std::chrono::nanoseconds(6000000) } } + { FunctionAdd.id.value, { std::chrono::nanoseconds(1000000) } }, + { FunctionMultiply.id.value, { std::chrono::nanoseconds(2000000) } }, + { FunctionReturnSame.id.value, { std::chrono::nanoseconds(3000000) } }, + { FunctionReturnSameString.id.value, { std::chrono::nanoseconds(4000000) } }, + { FunctionNoArgs.id.value, { std::chrono::nanoseconds(5000000) } }, + { FunctionWait.id.value, { std::chrono::nanoseconds(6000000) } } } } }, baafe::FunctionList { diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index 39d6e35..4c350db 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -53,7 +53,8 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl } if (funcId == 6) { // FunctionWait - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // 250 ms because the test waits for 200 ms before checking call in progress + std::this_thread::sleep_for(std::chrono::milliseconds(250)); serializeVoidResponse(funcId); return 0; } diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index 428e95d..79848e6 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -147,7 +147,8 @@ TEST_F(AsyncFunctionExecutorTests, FunctionCallInProgress) { executorProducer.callFunc(FunctionWait); }); // Give the thread a moment to start and set the callInProgress flag - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // 200 ms is chosen to account for worst case scenario thread creation time plus some extra time for function execution + std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Now, calling FunctionWait again should return FunctionCallInProgress error EXPECT_EQ(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); waitThread.join(); From 570a0d3279562a2c7da0201e0d2cd7bb73153ef0 Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Mon, 3 Nov 2025 11:28:15 +0100 Subject: [PATCH 08/10] added an option to offset channel IDs --- .../AsyncFunctionExecutor.hpp | 16 ++++++++--- .../async_function_execution/Constants.hpp | 4 +-- test/source/AsyncFunctionExecutorTests.cpp | 27 +++++++++++++++++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 7d800a9..95de0ae 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -173,17 +173,25 @@ class AsyncFunctionExecutor { /** * @brief Connects the client to the media driver and sets up communication channels. * Needs to be called before any function calls or polling. + * @param channelOffset Optional offset to add to all function channel IDs. Default is 0. + * Use this when multiple executors are used in the same process to avoid channel ID conflicts. * * @return Returns 0 on success, or a negative error code on failure. */ - int connect() { + int connect(uint32_t channelOffset = 0) { + if (channelOffset > (UINT32_MAX / (MESSAGE_RETURN_CHANNEL_OFFSET * 10))) { + std::cerr << "Channel offset too large" << std::endl; + return -1; // Error: Channel offset too large + } + channelOffset = channelOffset * (MESSAGE_RETURN_CHANNEL_OFFSET * 10); + std::vector toProducer; std::vector fromProducer; std::apply([&](const auto&... funcDefs) { - (toProducer.push_back(funcDefs.id.value + MESSAGE_RETURN_CHANNEL_OFFSET), ...); - (fromProducer.push_back(funcDefs.id.value), ...); - (callInProgress_.emplace(funcDefs.id.value, false), ...); + (toProducer.push_back(funcDefs.id.value + channelOffset + MESSAGE_RETURN_CHANNEL_OFFSET), ...); + (fromProducer.push_back(funcDefs.id.value + channelOffset), ...); + (callInProgress_.emplace(funcDefs.id.value + channelOffset, false), ...); }, functions_.functions); if (settings_.isProducer) { diff --git a/include/bringauto/async_function_execution/Constants.hpp b/include/bringauto/async_function_execution/Constants.hpp index 7c01fdf..47cb06c 100644 --- a/include/bringauto/async_function_execution/Constants.hpp +++ b/include/bringauto/async_function_execution/Constants.hpp @@ -9,9 +9,9 @@ namespace bringauto::async_function_execution { /// Maximum number of fragments to process in a single poll operation. constexpr int POLL_FRAGMENTS_LIMIT = 10; /// Offset added to function ID to determine the return message channel ID. -constexpr int MESSAGE_RETURN_CHANNEL_OFFSET = 1000; +constexpr unsigned int MESSAGE_RETURN_CHANNEL_OFFSET = 1000; /// Maximum size for serialized function arguments. -constexpr int MAX_ARGUMENT_SIZE = 65535; +constexpr unsigned int MAX_ARGUMENT_SIZE = 65535; /// Default Aeron connection string for communication over shared memory. constexpr std::string_view DEFAULT_AERON_CONNECTION = "aeron:ipc"; diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index 79848e6..f8213c5 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -154,3 +154,30 @@ TEST_F(AsyncFunctionExecutorTests, FunctionCallInProgress) { waitThread.join(); ASSERT_NE(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); } + +/** + * @brief Tests the connect function with different channel offsets. + */ +TEST(AsyncFunctionExecutorTestsStandalone, ConnectTest) { + baafe::AsyncFunctionExecutor executor1 { + baafe::Config { + .isProducer = true + }, + baafe::FunctionList { }, + std::make_unique() + }; + baafe::AsyncFunctionExecutor executor2 { + baafe::Config { + .isProducer = false + }, + baafe::FunctionList { }, + std::make_unique() + }; + + // Try a valid channel offset + int ret = executor1.connect(5); + ASSERT_EQ(ret, 0); + // Try an invalid channel offset + ret = executor2.connect(40000000); + ASSERT_EQ(ret, -1); +} From faa7238bf9156bc0364359253c32b59c14138a59 Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Mon, 3 Nov 2025 12:47:37 +0100 Subject: [PATCH 09/10] fix channel offset --- .../AsyncFunctionExecutor.hpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 95de0ae..ef2eae9 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -178,20 +178,20 @@ class AsyncFunctionExecutor { * * @return Returns 0 on success, or a negative error code on failure. */ - int connect(uint32_t channelOffset = 0) { + int connect(const uint32_t channelOffset = 0) { if (channelOffset > (UINT32_MAX / (MESSAGE_RETURN_CHANNEL_OFFSET * 10))) { std::cerr << "Channel offset too large" << std::endl; return -1; // Error: Channel offset too large } - channelOffset = channelOffset * (MESSAGE_RETURN_CHANNEL_OFFSET * 10); + channelOffset_ = channelOffset * (MESSAGE_RETURN_CHANNEL_OFFSET * 10); std::vector toProducer; std::vector fromProducer; std::apply([&](const auto&... funcDefs) { - (toProducer.push_back(funcDefs.id.value + channelOffset + MESSAGE_RETURN_CHANNEL_OFFSET), ...); - (fromProducer.push_back(funcDefs.id.value + channelOffset), ...); - (callInProgress_.emplace(funcDefs.id.value + channelOffset, false), ...); + (toProducer.push_back(funcDefs.id.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET), ...); + (fromProducer.push_back(funcDefs.id.value + channelOffset_), ...); + (callInProgress_.emplace(funcDefs.id.value + channelOffset_, false), ...); }, functions_.functions); if (settings_.isProducer) { @@ -227,10 +227,10 @@ class AsyncFunctionExecutor { callInProgress_[function.id.value] = true; auto messageBytes = serializeArgs(function.id, args...); - client_->sendMessage(function.id.value, messageBytes); + client_->sendMessage(function.id.value + channelOffset_, messageBytes); auto timeout = settings_.functionConfigs.getConfig(function.id.value).timeout; - auto responseBytes = client_->waitForMessage(function.id.value + MESSAGE_RETURN_CHANNEL_OFFSET, + auto responseBytes = client_->waitForMessage(function.id.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET, timeout == std::chrono::nanoseconds(0) ? settings_.defaultTimeout : timeout); if (responseBytes.empty()) { @@ -340,7 +340,7 @@ class AsyncFunctionExecutor { } auto messageBytes = serializeReturn(functionId, returnValue); - return client_->sendMessage(functionId.value + MESSAGE_RETURN_CHANNEL_OFFSET, messageBytes); + return client_->sendMessage(functionId.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET, messageBytes); } private: @@ -435,6 +435,8 @@ class AsyncFunctionExecutor { FunctionList functions_; /// Map to track if a function call is in progress for a given function ID. std::unordered_map callInProgress_{}; + /// Channel ID offset to apply to all function channel IDs. + uint32_t channelOffset_ {}; }; } From ebd4c6508f6ebc916fd7b672a14c6a63319f7513 Mon Sep 17 00:00:00 2001 From: MarioIvancik Date: Mon, 3 Nov 2025 14:04:46 +0100 Subject: [PATCH 10/10] fix callInProgress flags --- .../async_function_execution/AsyncFunctionExecutor.hpp | 2 +- test/include/AsyncFunctionExecutorTests.hpp | 5 +++-- test/include/MockClient.hpp | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index ef2eae9..429eedb 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -191,7 +191,7 @@ class AsyncFunctionExecutor { std::apply([&](const auto&... funcDefs) { (toProducer.push_back(funcDefs.id.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET), ...); (fromProducer.push_back(funcDefs.id.value + channelOffset_), ...); - (callInProgress_.emplace(funcDefs.id.value + channelOffset_, false), ...); + (callInProgress_.emplace(funcDefs.id.value, false), ...); }, functions_.functions); if (settings_.isProducer) { diff --git a/test/include/AsyncFunctionExecutorTests.hpp b/test/include/AsyncFunctionExecutorTests.hpp index a26bd21..92ffc17 100644 --- a/test/include/AsyncFunctionExecutorTests.hpp +++ b/test/include/AsyncFunctionExecutorTests.hpp @@ -100,8 +100,9 @@ baafe::AsyncFunctionExecutor executorConsumer { class AsyncFunctionExecutorTests : public ::testing::Test { protected: static void SetUpTestSuite() { - executorProducer.connect(); - executorConsumer.connect(); + // Connect both executors with an arbitrary channel offset of 42 + executorProducer.connect(42); + executorConsumer.connect(42); } void SetUp() override {} diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index 4c350db..6b31eea 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -20,14 +20,14 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl } int sendMessage(const uint32_t channelId, std::span &messageBytes) override { - if (channelId > 1000) { + if (channelId > 421000) { // Validate that this is a return message if (messageBytes.size() != 3 + sizeof(int)) { std::cerr << "Invalid return message size: " << messageBytes.size() << std::endl; return -1; // Error: Invalid return message size } - if (messageBytes[0] != static_cast(channelId - 1000) || messageBytes[1] != sizeof(int)) { + if (messageBytes[0] != static_cast(channelId - 421000) || messageBytes[1] != sizeof(int)) { std::cerr << "Invalid return message format." << std::endl; return -2; // Error: Invalid return message format } @@ -86,7 +86,7 @@ class MockClient final : public bringauto::async_function_execution::clients::Cl std::span waitForMessage(const uint32_t channelId, const std::chrono::nanoseconds timeout) override { // Test if the timeout is correctly set for each function - EXPECT_EQ((channelId - 1000) * 1000000, timeout.count()); + EXPECT_EQ((channelId - 421000) * 1000000, timeout.count()); if (messageBuffer_.empty()) { return {};