diff --git a/CMakeLists.txt b/CMakeLists.txt index fd8ae41..b77c682 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,9 +2,9 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25 FATAL_ERROR) PROJECT(async-function-execution C CXX) SET(CMAKE_POSITION_INDEPENDENT_CODE ON) -SET(CMAKE_CXX_STANDARD 20) +SET(CMAKE_CXX_STANDARD 23) -SET(ASYNC_FUNCTION_EXECUTION_VERSION 0.1.0) +SET(ASYNC_FUNCTION_EXECUTION_VERSION 1.0.0) FIND_PACKAGE(CMLIB COMPONENTS CMDEF CMUTIL @@ -14,7 +14,6 @@ FIND_PACKAGE(CMLIB INCLUDE(GNUInstallDirs) SET(ASYNC_FUNCTION_EXECUTION_TARGET_NAME async-function-execution) -SET(ASYNC_FUNCTION_EXECUTION_ALIAS_NAME ${PROJECT_NAME}::async-function-execution) OPTION(BRINGAUTO_SAMPLES OFF) OPTION(BRINGAUTO_PACKAGE "Package creation" OFF) @@ -38,7 +37,6 @@ IF (BRINGAUTO_PACKAGE) ENDIF () FIND_PACKAGE(aeron 1.48.6 REQUIRED) -FIND_PACKAGE(nlohmann_json 3.2.0 REQUIRED) FILE(GLOB_RECURSE source_files ${CMAKE_CURRENT_LIST_DIR}/source/*) @@ -51,19 +49,15 @@ CMDEF_ADD_LIBRARY( VERSION ${ASYNC_FUNCTION_EXECUTION_VERSION} ) -# I guess this is not needed when we use package? -#ADD_LIBRARY(${ASYNC_FUNCTION_EXECUTION_ALIAS_NAME} ALIAS "${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared") - TARGET_LINK_LIBRARIES(${ASYNC_FUNCTION_EXECUTION_TARGET_NAME}-shared PUBLIC aeron::aeron - aeron::aeron_client + aeron::aeron_client_shared aeron::aeron_driver - nlohmann_json::nlohmann_json ) IF(BRINGAUTO_TESTS) ENABLE_TESTING() - INCLUDE(${CMAKE_CURRENT_SOURCE_DIR}/test/CMakeLists.txt) + ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/test) INCLUDE(CTest) ENDIF(BRINGAUTO_TESTS) @@ -85,5 +79,5 @@ IF (BRINGAUTO_PACKAGE) ENDIF () IF (BRINGAUTO_SAMPLES) - ADD_SUBDIRECTORY(${CMAKE_CURRENT_LIST_DIR}/examples/) + ADD_SUBDIRECTORY(${CMAKE_CURRENT_LIST_DIR}/examples) ENDIF () diff --git a/README.md b/README.md index 8e48b8f..a108fa6 100644 --- a/README.md +++ b/README.md @@ -21,13 +21,38 @@ AsyncFunctionExecutor executorProducer { Config { .isProducer = true, // decides the mode of the executor .defaultTimeout = std::chrono::seconds(1) // polling timeout (should only be used when producer) + .functionConfigurations = structures::FunctionConfigs { { + { 1, { std::chrono::seconds(2) }} + } } }, - FunctionList { std::tuple{ // list of all functions + FunctionList { // list of all functions FunctionAdd - } } + } }; ``` +#### Post initialization + +Before using any functions, connection needs to be established using the connect function: + +```cpp +// Returns -1 on a failed connection +int rc = executorProducer.connect(); +``` + +#### functionConfigurations + +The functionConfigurations parameter accepts an unordered map representing per function configurations. Syntax: + +```cpp +{ + { , { } } +} +``` + +Supported parameters: + - timeout: replaces the default timeout value for that function (in nanoseconds) + ### Producer Producer is the side calling functions and waiting for a response from the consumer. If timeout is provided in config, the function will throw if it doesn't execute in time. Example of function calling: @@ -79,27 +104,16 @@ If a producer expects a return value where returned bytes are used directly, the ## Requirements -- [cmlib](https://github.com/cmakelib/cmakelib) - [aeron](https://github.com/aeron-io/aeron) +- [cmlib](https://github.com/cmakelib/cmakelib) + - the CMLIB_DIR env value has to be set -### Aeron setup - -Build and install aeron to any folder. - -```bash -git clone https://github.com/aeron-io/aeron.git -cd aeron -git checkout 1.48.5 -./cppbuild/cppbuild -cd cppbuild/Release -cmake --install . --prefix -``` ## Build ```bash mkdir -p _build && cd _build -cmake ../ -DCMLIB_DIR= -DCMAKE_PREFIX_PATH= +cmake ../ make ``` diff --git a/cmake/Dependencies.cmake b/cmake/Dependencies.cmake index 6147af7..c0e8103 100644 --- a/cmake/Dependencies.cmake +++ b/cmake/Dependencies.cmake @@ -1,6 +1,5 @@ SET(CMAKE_FIND_USE_CMAKE_SYSTEM_PATH FALSE) -BA_PACKAGE_LIBRARY(nlohmann-json v3.10.5 NO_DEBUG ON) BA_PACKAGE_LIBRARY(aeron v1.48.6) IF (BRINGAUTO_TESTS) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d679e08..92e3fd7 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,5 +1,15 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25 FATAL_ERROR) -ADD_EXECUTABLE(example main.cpp) +SET(CMAKE_CXX_STANDARD 23) -TARGET_LINK_LIBRARIES(example PUBLIC async-function-execution-shared) \ No newline at end of file +IF(NOT TARGET async-function-execution-shared) + MESSAGE(FATAL_ERROR "The async-function-execution-shared target was not found. Please build the example as part of the async-function-execution-shared project.") +ENDIF() + +ADD_EXECUTABLE(example_producer main_producer.cpp) +ADD_EXECUTABLE(example_consumer main_consumer.cpp) +ADD_EXECUTABLE(example_driver main_driver.cpp) + +TARGET_LINK_LIBRARIES(example_producer PUBLIC async-function-execution-shared) +TARGET_LINK_LIBRARIES(example_consumer PUBLIC async-function-execution-shared) +TARGET_LINK_LIBRARIES(example_driver PUBLIC async-function-execution-shared) diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..0e6ab59 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,22 @@ +# Async function execution example + +This folder contains a simple example of the usage of this project. 3 executables will be built: the aeron driver, the producer and the consumer. + +## Build + +Examples are built as part of the main project. Don't use the CMakeLists file in the examples folder. + +```bash +mkdir -p _build_example && cd _build_example +cmake ../ -DBRINGAUTO_SAMPLES=ON +make +``` + +## Run + +```bash +# Run the executables in this order +./example_driver +./example_consumer +./example_producer +``` \ No newline at end of file diff --git a/examples/main.cpp b/examples/main.cpp deleted file mode 100644 index 0e409bb..0000000 --- a/examples/main.cpp +++ /dev/null @@ -1,41 +0,0 @@ -#include - -#include -#include - - -using namespace bringauto::async_function_execution; - -FunctionDefinition ExampleFunc1 { - FunctionId { 1 }, - Return { std::string {} }, - Arguments { int {}, std::string {}, float {} } -}; - -FunctionDefinition ExampleFunc2 { - FunctionId { 2 }, - Return { std::string {} }, - Arguments { int {}, std::string {} } -}; - -FunctionDefinition ExampleFunc3 { - FunctionId { 3 }, - Return { std::string {} }, - Arguments { int {} } -}; - - -int main() { - AsyncFunctionExecutor executor { - Config { - .isProducer = true, - .defaultTimeout = std::chrono::seconds(1) - }, - FunctionList { std::tuple{ ExampleFunc1, ExampleFunc2, ExampleFunc3 } }, - }; - - std::cout << executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f) << std::endl; - std::cout << executor.callFunc(ExampleFunc2, 100, "World") << std::endl; - std::cout << executor.callFunc(ExampleFunc3, 123) << std::endl; - return 0; -} diff --git a/examples/main_consumer.cpp b/examples/main_consumer.cpp new file mode 100644 index 0000000..e0a65a4 --- /dev/null +++ b/examples/main_consumer.cpp @@ -0,0 +1,86 @@ +#include +#include + + + +using namespace bringauto::async_function_execution; + +struct SerializableString final { + std::string value {}; + SerializableString() = default; + explicit SerializableString(std::string str) : value(std::move(str)) {} + + std::span serialize() const { + return std::span {reinterpret_cast(value.data()), value.size()}; + } + void deserialize(std::span bytes) { + value = std::string {reinterpret_cast(bytes.data()), bytes.size()}; + } +}; + +constexpr int ExampleFunc1Id = 1; +constexpr int ExampleFunc2Id = 2; +constexpr int ExampleFunc3Id = 3; + +FunctionDefinition ExampleFunc1 { + FunctionId { ExampleFunc1Id }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {}, float {} } +}; + +FunctionDefinition ExampleFunc2 { + FunctionId { ExampleFunc2Id }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {} } +}; + +FunctionDefinition ExampleFunc3 { + FunctionId { ExampleFunc3Id }, + Return { SerializableString {} }, + Arguments { int {} } +}; + + +int main() { + AsyncFunctionExecutor executor { + Config { + .isProducer = false, + }, + FunctionList { ExampleFunc1, ExampleFunc2, ExampleFunc3 }, + }; + + if (executor.connect() != 0) { + std::cerr << "Consumer: Failed to connect to executor" << std::endl; + return 1; + } + + while (true) { + auto [funcId, argBytes] = executor.pollFunction(); + + switch (funcId.value) { + case ExampleFunc1Id: { + auto [arg1, arg2, arg3] = executor.getFunctionArgs(ExampleFunc1, argBytes); + std::cout << "Consumer: Received Function 1 call with args (" << arg1 << ", " << arg2.value << ", " << arg3 << ")." << std::endl; + executor.sendReturnMessage(funcId, SerializableString{"Func 1 return value"}); + break; + } + case ExampleFunc2Id: { + auto [arg1, arg2] = executor.getFunctionArgs(ExampleFunc2, argBytes); + std::cout << "Consumer: Received Function 2 call with args (" << arg1 << ", " << arg2.value << ")." << std::endl; + executor.sendReturnMessage(funcId, SerializableString{"Func 2 return value"}); + break; + } + case ExampleFunc3Id: { + auto [arg1] = executor.getFunctionArgs(ExampleFunc3, argBytes); + std::cout << "Consumer: Received Function 3 call with args (" << arg1 << ")" << std::endl; + executor.sendReturnMessage(funcId, SerializableString{"Func 3 return value"}); + break; + } + default: + std::cerr << "Consumer: Unknown function ID received: " << static_cast(funcId.value) << std::endl; + throw std::runtime_error("Unknown function ID"); + } + } + + return 0; +} diff --git a/examples/main_driver.cpp b/examples/main_driver.cpp new file mode 100644 index 0000000..b39643c --- /dev/null +++ b/examples/main_driver.cpp @@ -0,0 +1,11 @@ +#include + + + +using namespace bringauto::async_function_execution; + +int main() { + AeronDriver driver; + driver.run(); + return 0; +} diff --git a/examples/main_producer.cpp b/examples/main_producer.cpp new file mode 100644 index 0000000..d3f27f9 --- /dev/null +++ b/examples/main_producer.cpp @@ -0,0 +1,59 @@ +#include + + +using namespace bringauto::async_function_execution; + +struct SerializableString final { + std::string value {}; + SerializableString() = default; + explicit SerializableString(std::string str) : value(std::move(str)) {} + + std::span serialize() const { + return std::span {reinterpret_cast(value.data()), value.size()}; + } + void deserialize(std::span bytes) { + value = std::string {reinterpret_cast(bytes.data()), bytes.size()}; + } +}; + +FunctionDefinition ExampleFunc1 { + FunctionId { 1 }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {}, float {} } +}; + +FunctionDefinition ExampleFunc2 { + FunctionId { 2 }, + Return { SerializableString {} }, + Arguments { int {}, SerializableString {} } +}; + +FunctionDefinition ExampleFunc3 { + FunctionId { 3 }, + Return { SerializableString {} }, + Arguments { int {} } +}; + + +int main() { + AsyncFunctionExecutor executor { + Config { + .isProducer = true, + .defaultTimeout = std::chrono::seconds(1) + }, + FunctionList { ExampleFunc1, ExampleFunc2, ExampleFunc3 }, + }; + + if (executor.connect() != 0) { + std::cerr << "Producer: Failed to connect to executor" << std::endl; + return 1; + } + + auto result1 = executor.callFunc(ExampleFunc1, 42, "Hello", 3.14f).value(); + std::cout << result1.value << std::endl; + auto result2 = executor.callFunc(ExampleFunc2, 100, "World").value(); + std::cout << result2.value << std::endl; + auto result3 = executor.callFunc(ExampleFunc3, 123).value(); + std::cout << result3.value << std::endl; + return 0; +} diff --git a/include/bringauto/async_function_execution/AeronDriver.hpp b/include/bringauto/async_function_execution/AeronDriver.hpp index df553bf..78100ea 100644 --- a/include/bringauto/async_function_execution/AeronDriver.hpp +++ b/include/bringauto/async_function_execution/AeronDriver.hpp @@ -24,13 +24,13 @@ class AeronDriver { * @brief Starts the Aeron Driver. * This method will block until the driver is stopped or an error occurs. */ - void run(); + void run() const; /** * @brief Checks if the Aeron Driver is running. * @return true if the driver is running, false otherwise. */ - bool isRunning() const; + [[nodiscard]] bool isRunning() const; /** * @brief Stops the Aeron Driver. diff --git a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp index 5232514..429eedb 100644 --- a/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp +++ b/include/bringauto/async_function_execution/AsyncFunctionExecutor.hpp @@ -7,6 +7,7 @@ #include #include #include +#include @@ -19,13 +20,13 @@ namespace bringauto::async_function_execution { struct Config { bool isProducer = true; std::chrono::nanoseconds defaultTimeout = std::chrono::nanoseconds(0); - std::string_view functionConfigurations = ""; + structures::FunctionConfigs functionConfigurations {}; }; /** * @brief Unique identifier for a function in the AsyncFunctionExecutor. - * Supported range is 0-255 + * value: The function ID value. Supported range is 0-255. */ struct FunctionId { const uint8_t value; @@ -43,11 +44,12 @@ concept HasSerialize = requires(const T& t) { /** * @brief Structure representing the return type of function. + * value: The return type value. */ template struct Return { const T value; - constexpr Return(T &&val) : value(std::forward(val)) {} + constexpr explicit Return(T &&val) : value(std::forward(val)) {} }; @@ -56,22 +58,26 @@ struct Return { */ template<> struct Return { - constexpr Return() noexcept {} + constexpr Return() noexcept = default; }; /** * @brief Structure representing the argument types of a function. + * values: A tuple holding the types of the function arguments. */ template struct Arguments { const std::tuple values; - constexpr Arguments(Args &&...args) : values{std::forward(args)...} {} + constexpr explicit Arguments(Args &&...args) : values{std::forward(args)...} {} }; /** * @brief Definition of a function that can be called or responded to via AsyncFunctionExecutor. + * id: Unique identifier for the function. + * returnType: The return type of the function. + * argumentTypes: The argument types of the function. */ template struct FunctionDefinition { @@ -82,12 +88,48 @@ struct FunctionDefinition { /** - * @brief Helper structure to hold a list of function definitions. + * @brief Helper type trait to identify FunctionDefinition types. */ -template +template +struct is_function_definition : std::false_type {}; + + +/** + * @brief Specialization of is_function_definition for FunctionDefinition types. + */ +template +struct is_function_definition> : std::true_type {}; + + +/** + * @brief Concept to ensure a type is a FunctionDefinition. + */ +template +concept IsFunctionDefinition = requires { typename std::decay_t; } && + is_function_definition>::value; + + +/** + * @brief Helper structure used to store function definitions. + * functions: A tuple holding all the function definitions. + */ +template struct FunctionList { const std::tuple functions; - FunctionList(Funcs&&... funcs) : functions(std::forward(funcs)...) {} + explicit FunctionList(Funcs... funcs) : functions(std::move(funcs)...) {} +}; + + +/** + * @brief Enum class representing possible error states during function calls. + */ +enum class CallError { + InvalidExecutorType, // Called a producer-only function in consumer mode or vice versa + FunctionIdNotDefined, // Function ID is not defined in the FunctionList + ArgumentCountMismatch, // Number of expected arguments does not match + TimeoutOrNoResponse, // No response received within the timeout + FunctionIdMismatch, // Function ID does not match + FunctionCallInProgress // Function call is already in progress }; @@ -103,8 +145,9 @@ class AsyncFunctionExecutor { * * @param config Configuration for the async function executor. * @param functions A list of function definitions that the client can call or respond to. + * @param client Optional custom client implementing ClientInterface. If not provided, a default AeronClient is used. */ - AsyncFunctionExecutor(Config config, + AsyncFunctionExecutor(const Config& config, const FunctionList &functions, std::unique_ptr client = nullptr) : client_(nullptr), settings_(config.isProducer, config.defaultTimeout, config.functionConfigurations), functions_(functions) { @@ -117,9 +160,9 @@ class AsyncFunctionExecutor { ); } - for (const auto& [funcId, _] : settings_.functionConfigs) { + for (const auto &funcId: settings_.functionConfigs.getFunctionIds()) { if (!isFunctionDefined(FunctionId{funcId})) { - throw std::runtime_error("Warning: Function ID " + std::to_string(static_cast(funcId)) + " in configuration is not defined in FunctionList."); + throw std::runtime_error("Warning: Function ID " + std::to_string(funcId) + " in configuration is not defined in FunctionList."); } } }; @@ -130,21 +173,31 @@ class AsyncFunctionExecutor { /** * @brief Connects the client to the media driver and sets up communication channels. * Needs to be called before any function calls or polling. + * @param channelOffset Optional offset to add to all function channel IDs. Default is 0. + * Use this when multiple executors are used in the same process to avoid channel ID conflicts. + * + * @return Returns 0 on success, or a negative error code on failure. */ - void connect() { + int connect(const uint32_t channelOffset = 0) { + if (channelOffset > (UINT32_MAX / (MESSAGE_RETURN_CHANNEL_OFFSET * 10))) { + std::cerr << "Channel offset too large" << std::endl; + return -1; // Error: Channel offset too large + } + channelOffset_ = channelOffset * (MESSAGE_RETURN_CHANNEL_OFFSET * 10); + std::vector toProducer; std::vector fromProducer; - std::apply([&](auto&&... funcDefs) { - (toProducer.push_back(funcDefs.id.value + MESSAGE_RETURN_CHANNEL_OFFSET), ...); - (fromProducer.push_back(funcDefs.id.value), ...); - }, std::get<0>(functions_.functions)); + std::apply([&](const auto&... funcDefs) { + (toProducer.push_back(funcDefs.id.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET), ...); + (fromProducer.push_back(funcDefs.id.value + channelOffset_), ...); + (callInProgress_.emplace(funcDefs.id.value, false), ...); + }, functions_.functions); if (settings_.isProducer) { - client_->connect(toProducer, fromProducer); - } else { - client_->connect(fromProducer, toProducer); + return client_->connect(toProducer, fromProducer); } + return client_->connect(fromProducer, toProducer); } @@ -155,35 +208,39 @@ class AsyncFunctionExecutor { * @param function The function definition of which function to call. * @param args The arguments to pass to the function. * @return The return value of the function. If the return type contains some byte buffer, - * the memory is valid until the next call to callFunc(). + * the memory is valid until the next call to callFunc(). On error, returns a CallError enum value. */ template - Ret callFunc(const FunctionDefinition &function, CallArgs&&... args) { + auto callFunc(const FunctionDefinition &function, CallArgs&&... args) -> std::expected { if (!settings_.isProducer) { - throw std::runtime_error("Cannot call function in consumer mode"); + return std::unexpected(CallError::InvalidExecutorType); } if (sizeof...(CallArgs) < sizeof...(FArgs)) { - throw std::invalid_argument("Argument count mismatch"); + return std::unexpected(CallError::ArgumentCountMismatch); } if (!isFunctionDefined(function.id)) { - throw std::runtime_error("Function ID not defined"); + return std::unexpected(CallError::FunctionIdNotDefined); } - + if (callInProgress_[function.id.value].load()) { + return std::unexpected(CallError::FunctionCallInProgress); + } + + callInProgress_[function.id.value] = true; auto messageBytes = serializeArgs(function.id, args...); - client_->sendMessage(function.id.value, messageBytes); + client_->sendMessage(function.id.value + channelOffset_, messageBytes); + + auto timeout = settings_.functionConfigs.getConfig(function.id.value).timeout; + auto responseBytes = client_->waitForMessage(function.id.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET, + timeout == std::chrono::nanoseconds(0) ? settings_.defaultTimeout : timeout); - auto responseBytes = client_->waitForMessage(function.id.value + MESSAGE_RETURN_CHANNEL_OFFSET); if (responseBytes.empty()) { - throw std::runtime_error("No response received or timeout"); + callInProgress_[function.id.value] = false; + return std::unexpected(CallError::TimeoutOrNoResponse); } - - Ret response = deserializeReturn(function.id, responseBytes); - if constexpr (std::is_same_v) { - return; - } else { - return response; - } + auto response = deserializeReturn(function.id, responseBytes); + callInProgress_[function.id.value] = false; + return response; } @@ -199,7 +256,7 @@ class AsyncFunctionExecutor { return std::make_tuple(FunctionId{}, std::span{}); // Error: Cannot start polling in producer mode } - auto requestBytes = client_->waitForAnyMessage(); + const auto requestBytes = client_->waitForAnyMessage(); if (requestBytes.empty()) { return std::make_tuple(FunctionId{}, std::span{}); // Error: No message received or timeout } @@ -211,7 +268,7 @@ class AsyncFunctionExecutor { /** * @brief Deserializes function arguments from a byte span into a tuple of argument values. - * Can only be used in consumer mode. + * Can only be used in consumer mode. Throws on error. * * @param function The function definition corresponding to the arguments. * @param argBytes The byte span containing the serialized arguments. @@ -227,7 +284,7 @@ class AsyncFunctionExecutor { throw std::runtime_error("Function ID not defined"); } - if (argBytes.size() < 1) { + if (argBytes.empty()) { throw std::invalid_argument("Not enough data to read argument count"); } @@ -241,7 +298,7 @@ class AsyncFunctionExecutor { std::tuple args; auto extractArg = [&](auto &arg) { if (pos >= argBytes.size()) throw std::invalid_argument("Unexpected end of data while reading argument size"); - uint16_t len = argBytes[pos] | (static_cast(argBytes[pos + 1]) << 8); + const uint16_t len = argBytes[pos] | (static_cast(argBytes[pos + 1]) << 8); pos += 2; if (pos + len > argBytes.size()) throw std::invalid_argument("Unexpected end of data while reading argument content"); @@ -283,15 +340,15 @@ class AsyncFunctionExecutor { } auto messageBytes = serializeReturn(functionId, returnValue); - return client_->sendMessage(functionId.value + MESSAGE_RETURN_CHANNEL_OFFSET, messageBytes); + return client_->sendMessage(functionId.value + channelOffset_ + MESSAGE_RETURN_CHANNEL_OFFSET, messageBytes); } private: bool isFunctionDefined(const FunctionId &funcId) { bool found = false; - std::apply([&](auto&&... funcDefs) { + std::apply([&](const auto&... funcDefs) { ((funcDefs.id.value == funcId.value ? found = true : false), ...); - }, std::get<0>(functions_.functions)); + }, functions_.functions); return found; } @@ -299,7 +356,7 @@ class AsyncFunctionExecutor { std::span serializeArgs(const FunctionId &funcId, const Args&... args) { serializationBuffer_.clear(); std::size_t totalSize = 2; // Function ID + Argument count - ((totalSize += 2 + sizeof(args)), ...); // Each argument: size byte + data + ((totalSize += 2 + sizeof(args)), ...); // Each argument: size bytes + data serializationBuffer_.reserve(totalSize); serializationBuffer_.push_back(funcId.value); serializationBuffer_.push_back(static_cast(sizeof...(Args))); @@ -311,7 +368,7 @@ class AsyncFunctionExecutor { template std::span serializeReturn(const FunctionId &funcId, const T &returnValue) { serializationBuffer_.clear(); - std::size_t totalSize = 3 + sizeof(returnValue); + const std::size_t totalSize = 3 + sizeof(returnValue); serializationBuffer_.reserve(totalSize); serializationBuffer_.push_back(funcId.value); appendArg(serializationBuffer_, returnValue); @@ -326,38 +383,42 @@ class AsyncFunctionExecutor { if (bytes.size() > MAX_ARGUMENT_SIZE) { throw std::invalid_argument("Serialized data too large"); } - uint16_t size = static_cast(bytes.size()); + const auto size = static_cast(bytes.size()); buffer.push_back(static_cast(size & 0xFF)); - buffer.push_back(static_cast((size >> 8) & 0xFF)); + buffer.push_back(static_cast(size >> 8 & 0xFF)); buffer.insert(buffer.end(), bytes.begin(), bytes.end()); } else { static_assert(std::is_trivially_copyable_v, "Argument type must be trivially copyable"); - uint16_t size = static_cast(sizeof(arg)); + const auto size = static_cast(sizeof(arg)); buffer.push_back(static_cast(size & 0xFF)); - buffer.push_back(static_cast((size >> 8) & 0xFF)); + buffer.push_back(static_cast(size >> 8 & 0xFF)); buffer.insert(buffer.end(), reinterpret_cast(&arg), reinterpret_cast(&arg) + sizeof(T)); } } template - T deserializeReturn(const FunctionId &funcId, const std::span& bytes) { + auto deserializeReturn(const FunctionId &funcId, const std::span& bytes) -> std::expected { if(funcId.value != bytes[0]) { - throw std::invalid_argument("Function ID mismatch in return value"); + return std::unexpected(CallError::FunctionIdMismatch); } - - T value; - if constexpr (HasSerialize) { - value.deserialize(std::span {bytes.data() + 3, bytes.size() - 3}); + + if constexpr (std::is_same_v) { + return std::expected{}; } else { - std::memcpy(&value, bytes.data() + 3, sizeof(T)); + T value; + if constexpr (HasSerialize) { + value.deserialize(std::span {bytes.data() + 3, bytes.size() - 3}); + } else { + std::memcpy(&value, bytes.data() + 3, sizeof(T)); + } + return value; } - return value; } std::tuple> deserializeRequest(const std::span& bytes) { - if (bytes.size() < 1) { + if (bytes.empty()) { throw std::invalid_argument("Not enough data to deserialize request"); } FunctionId funcId{ bytes[0] }; @@ -368,9 +429,14 @@ class AsyncFunctionExecutor { /// Buffer used for serialization of messages. mutable std::vector serializationBuffer_; + /// Client used for communication. Can be a custom implementation of ClientInterface. std::unique_ptr client_; structures::Settings settings_; FunctionList functions_; + /// Map to track if a function call is in progress for a given function ID. + std::unordered_map callInProgress_{}; + /// Channel ID offset to apply to all function channel IDs. + uint32_t channelOffset_ {}; }; } diff --git a/include/bringauto/async_function_execution/Constants.hpp b/include/bringauto/async_function_execution/Constants.hpp index ecdfaed..47cb06c 100644 --- a/include/bringauto/async_function_execution/Constants.hpp +++ b/include/bringauto/async_function_execution/Constants.hpp @@ -6,10 +6,17 @@ namespace bringauto::async_function_execution { +/// Maximum number of fragments to process in a single poll operation. constexpr int POLL_FRAGMENTS_LIMIT = 10; -constexpr int MESSAGE_RETURN_CHANNEL_OFFSET = 1000; -constexpr int MAX_ARGUMENT_SIZE = 65535; +/// Offset added to function ID to determine the return message channel ID. +constexpr unsigned int MESSAGE_RETURN_CHANNEL_OFFSET = 1000; +/// Maximum size for serialized function arguments. +constexpr unsigned int MAX_ARGUMENT_SIZE = 65535; +/// Default Aeron connection string for communication over shared memory. constexpr std::string_view DEFAULT_AERON_CONNECTION = "aeron:ipc"; - + +/// Timeout duration for establishing stream connections. +constexpr std::chrono::milliseconds STREAM_CONNECTION_TIMEOUT = std::chrono::milliseconds(5000); + } diff --git a/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp b/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp index 39a1b92..46f4815 100644 --- a/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp +++ b/include/bringauto/async_function_execution/TimeoutIdleStrategy.hpp @@ -1,104 +1,68 @@ #pragma once -#include -#include - -#include #include -#include namespace bringauto::async_function_execution { -enum class IdleState { - NOT_IDLE = 0, - SPINNING = 1, - YIELDING = 2, - PARKING = 3 -}; - class TimeoutIdleStrategy { public: + /** + * @brief Constructs a TimeoutIdleStrategy instance. + * + * @param timeoutNs Optional default timeout duration for idling operations. If set to zero, no timeout is applied. + * @param maxSpinPeriodNs Maximum duration to spend in the SPINNING state. Default is 1 second. + * @param maxYieldPeriodNs Maximum duration to spend in the YIELDING state. Default is 2 seconds. + * @param minParkPeriodNs Minimum duration to sleep in the PARKING state. Default is 1 microsecond. + * @param maxParkPeriodNs Maximum duration to sleep in the PARKING state. Default is 1 millisecond. + */ explicit TimeoutIdleStrategy( - std::chrono::nanoseconds timeoutNs = std::chrono::nanoseconds(0), - std::chrono::nanoseconds maxSpinPeriodNs = std::chrono::duration(1000), - std::chrono::nanoseconds maxYieldPeriodNs = std::chrono::duration(2000), - std::chrono::nanoseconds minParkPeriodNs = std::chrono::nanoseconds(1000), - std::chrono::nanoseconds maxParkPeriodNs = std::chrono::duration(1) - ) : prePad_(), - maxSpinPeriodNs_(maxSpinPeriodNs), + const std::chrono::nanoseconds timeoutNs = std::chrono::nanoseconds(0), + const std::chrono::nanoseconds maxSpinPeriodNs = std::chrono::duration(1000), + const std::chrono::nanoseconds maxYieldPeriodNs = std::chrono::duration(2000), + const std::chrono::nanoseconds minParkPeriodNs = std::chrono::nanoseconds(1000), + const std::chrono::nanoseconds maxParkPeriodNs = std::chrono::duration(1) + ) : maxSpinPeriodNs_(maxSpinPeriodNs), maxYieldPeriodNs_(maxYieldPeriodNs), minParkPeriodNs_(minParkPeriodNs), maxParkPeriodNs_(maxParkPeriodNs), parkPeriodNs_(minParkPeriodNs), state_(IdleState::NOT_IDLE), - timeoutNs_(timeoutNs), - postPad_() {} - - int idle(const int workCount) { - if (workCount > 0) { - reset(); - return 0; - } - return idle(); - } - - void reset() { - parkPeriodNs_ = minParkPeriodNs_; - state_ = IdleState::NOT_IDLE; - startTime_ = std::chrono::steady_clock::time_point(); - } - - int idle() { - auto now = std::chrono::steady_clock::now(); - if (timeoutNs_ != std::chrono::nanoseconds(0)) { - if (startTime_ == std::chrono::steady_clock::time_point()) { - startTime_ = now; - } - if (std::chrono::duration_cast(now - startTime_) >= timeoutNs_) { - return -1; - } - } - - switch(state_) { - case IdleState::NOT_IDLE: - state_ = IdleState::SPINNING; - lastStateSwitchTime_ = now; - break; - - case IdleState::SPINNING: - aeron::concurrent::atomic::cpu_pause(); - if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxSpinPeriodNs_) { - state_ = IdleState::YIELDING; - lastStateSwitchTime_ = now; - } - break; - - case IdleState::YIELDING: - std::this_thread::yield(); - if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxYieldPeriodNs_) { - state_ = IdleState::PARKING; - parkPeriodNs_ = minParkPeriodNs_; - lastStateSwitchTime_ = now; - } - break; - - case IdleState::PARKING: - default: - std::this_thread::sleep_for(parkPeriodNs_); - parkPeriodNs_ = std::min(parkPeriodNs_ * 2, maxParkPeriodNs_); - break; - } - return 0; - } - - void setTimeout(std::chrono::nanoseconds timeoutNs) { - timeoutNs_ = timeoutNs; - } + timeoutNs_(timeoutNs) {} + + /** + * @brief Idles based on the current state and the provided timeout. + * + * @param workCount Number of work items processed since the last call. + * @param timeout Optional timeout duration. If not provided, the default timeout set in the + * configuration will be used. + * @return 0 if idling continues, -1 if the timeout has been reached. + */ + int idle(int workCount, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)); + + /** + * @brief Resets the idle strategy to its initial state. + */ + void reset(); + +private: + /** + * @brief Enumeration of idle states. + */ + enum class IdleState { + /// Default state, no idling. + NOT_IDLE = 0, + /// Spinning state, actively checking for work. + SPINNING = 1, + /// Yielding state, giving up the CPU for a short duration. + YIELDING = 2, + /// Parking state, sleeping for a longer duration. + PARKING = 3 + }; + + int idle(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)); -protected: - std::uint8_t prePad_[aeron::util::BitUtil::CACHE_LINE_LENGTH]; std::chrono::nanoseconds maxSpinPeriodNs_; std::chrono::nanoseconds maxYieldPeriodNs_; std::chrono::nanoseconds minParkPeriodNs_; @@ -108,7 +72,6 @@ class TimeoutIdleStrategy { std::chrono::nanoseconds timeoutNs_; std::chrono::steady_clock::time_point startTime_; std::chrono::steady_clock::time_point lastStateSwitchTime_; - std::uint8_t postPad_[aeron::util::BitUtil::CACHE_LINE_LENGTH]; }; } diff --git a/include/bringauto/async_function_execution/clients/AeronClient.hpp b/include/bringauto/async_function_execution/clients/AeronClient.hpp index 9797056..9de589d 100644 --- a/include/bringauto/async_function_execution/clients/AeronClient.hpp +++ b/include/bringauto/async_function_execution/clients/AeronClient.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -63,30 +64,46 @@ class AeronClient final : public ClientInterface { */ int connect(const std::vector& subscriptionIds, const std::vector& publicationIds) override { aeron_ = aeron::Aeron::connect(aeronContext_); - int64_t id; - for (const auto &pubId : publicationIds) { - id = aeron_->addPublication(aeronConnection_, pubId); - std::shared_ptr publication = aeron_->findPublication(id); - while (!publication) { - std::this_thread::yield(); - publication = aeron_->findPublication(id); - } - aeronPublications_[pubId] = publication; - } - for (const auto &subId : subscriptionIds) { - id = aeron_->addSubscription(aeronConnection_, subId); - std::shared_ptr subscription = aeron_->findSubscription(id); - while (!subscription) { - std::this_thread::yield(); - subscription = aeron_->findSubscription(id); + try { + for (const auto &pubId : publicationIds) { + id = aeron_->addPublication(aeronConnection_, pubId); + std::shared_ptr publication = aeron_->findPublication(id); + const auto deadline = std::chrono::steady_clock::now() + STREAM_CONNECTION_TIMEOUT; + while (!publication) { + if (std::chrono::steady_clock::now() > deadline) { + std::cerr << "Aeron connection error: Timeout while waiting for publication" << std::endl; + return -1; // Error: Timeout + } + std::this_thread::yield(); + publication = aeron_->findPublication(id); + } + aeronPublications_[pubId] = publication; } - aeronSubscriptions_[subId] = subscription; - aeronPolling_[subId] = false; + + for (const auto &subId : subscriptionIds) { + id = aeron_->addSubscription(aeronConnection_, subId); + std::shared_ptr subscription = aeron_->findSubscription(id); + const auto deadline = std::chrono::steady_clock::now() + STREAM_CONNECTION_TIMEOUT; + while (!subscription) { + if (std::chrono::steady_clock::now() > deadline) { + std::cerr << "Aeron connection error: Timeout while waiting for subscription" << std::endl; + return -1; // Error: Timeout + } + std::this_thread::yield(); + subscription = aeron_->findSubscription(id); + } + aeronSubscriptions_[subId] = subscription; + aeronPolling_[subId] = false; + } + } catch (const std::exception &e) { + std::cerr << "Aeron connection error: " << e.what() << std::endl; + return -1; // Error: Aeron connection failed } if (aeronPublications_.empty() || aeronSubscriptions_.empty()) { + std::cerr << "Aeron connection error: No publications or subscriptions available" << std::endl; return -1; // Error: No publications or subscriptions available } return 0; @@ -99,11 +116,16 @@ class AeronClient final : public ClientInterface { * @param channelId The channel ID to send the message to. * @param messageBytes The message bytes to send. * @return Returns a positive number on success, or a negative error code on failure. - * (NOT_CONNECTED = -1, BACK_PRESSURED = -2, ADMIN_ACTION = -3, PUBLICATION_CLOSED = -4) + * (NOT_CONNECTED = -1, BACK_PRESSURED = -2, ADMIN_ACTION = -3, PUBLICATION_CLOSED = -4) */ int sendMessage(const uint32_t channelId, std::span &messageBytes) override { - aeron::concurrent::AtomicBuffer srcBuffer(const_cast(messageBytes.data()), messageBytes.size()); - return aeronPublications_[channelId]->offer(srcBuffer, 0, messageBytes.size()); + const aeron::concurrent::AtomicBuffer srcBuffer(const_cast(messageBytes.data()), messageBytes.size()); + const auto it = aeronPublications_.find(channelId); + if (it == aeronPublications_.end()) { + std::cerr << "Aeron send error: Channel ID not found" << std::endl; + return -1; // Error: Channel ID not found + } + return it->second->offer(srcBuffer, 0, messageBytes.size()); }; @@ -111,13 +133,19 @@ class AeronClient final : public ClientInterface { * @brief Retrieves the last received message from Aeron. * * @param channelId The channel ID to wait for a message from. + * @param timeout Maximum time to wait for a message before timing out. * @return Bytes of the last message received. Returns an empty span on timeout or error. */ - std::span waitForMessage(const uint32_t channelId) override { + std::span waitForMessage(const uint32_t channelId, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0)) override { + const auto it = aeronSubscriptions_.find(channelId); + if (it == aeronSubscriptions_.end()) { + std::cerr << "Aeron wait error: Channel ID not found" << std::endl; + return {}; // Error: Channel ID not found + } aeronPolling_[channelId] = true; while (aeronPolling_[channelId]) { - const int fragmentsRead = aeronSubscriptions_[channelId]->poll(*aeronHandler_, 10); - if(aeronIdleStrategy_->idle(fragmentsRead) != 0) { + const int fragmentsRead = it->second->poll(*aeronHandler_, 10); + if(aeronIdleStrategy_->idle(fragmentsRead, timeout) != 0) { aeronIdleStrategy_->reset(); return {}; // Error: Aeron message wait timed out } @@ -176,4 +204,4 @@ class AeronClient final : public ClientInterface { std::unordered_map aeronMessages_ {}; }; -} \ No newline at end of file +} diff --git a/include/bringauto/async_function_execution/clients/ClientInterface.hpp b/include/bringauto/async_function_execution/clients/ClientInterface.hpp index 23b9ca5..e7c4f0e 100644 --- a/include/bringauto/async_function_execution/clients/ClientInterface.hpp +++ b/include/bringauto/async_function_execution/clients/ClientInterface.hpp @@ -1,8 +1,8 @@ #pragma once #include -#include #include +#include @@ -29,15 +29,16 @@ class ClientInterface { * @param messageBytes The bytes of the message to send. * @return Returns 0 on success, or a negative error code on failure. */ - virtual int sendMessage(const uint32_t channelId, std::span &messageBytes) = 0; + virtual int sendMessage(uint32_t channelId, std::span &messageBytes) = 0; /** * @brief Waits for a message from the specified channel ID. * * @param channelId The channel ID to wait for a message from. + * @param timeout Maximum time to wait for a message before timing out. * @return Bytes of the last message received. Returns an empty span on timeout or error. */ - virtual std::span waitForMessage(const uint32_t channelId) = 0; + virtual std::span waitForMessage(uint32_t channelId, std::chrono::nanoseconds timeout) = 0; /** * @brief Waits for any message from all channels. diff --git a/include/bringauto/async_function_execution/structures/Settings.hpp b/include/bringauto/async_function_execution/structures/Settings.hpp index 11a270e..fd538f1 100644 --- a/include/bringauto/async_function_execution/structures/Settings.hpp +++ b/include/bringauto/async_function_execution/structures/Settings.hpp @@ -1,7 +1,5 @@ #pragma once -#include - #include #include @@ -17,6 +15,37 @@ struct FunctionConfig { std::chrono::nanoseconds timeout = std::chrono::nanoseconds(0); }; +/** + * @brief Wrapper for multiple FunctionConfig instances, keyed by FunctionId. + * configs: Map of FunctionId to FunctionConfig. + */ +struct FunctionConfigs { + FunctionConfigs() = default; + explicit FunctionConfigs(std::unordered_map configs) : configs(std::move(configs)) {}; + + FunctionConfig getConfig(const uint8_t functionId) const { + if (configs.find(functionId) != configs.end()) { + return configs.at(functionId); + } + return FunctionConfig{}; + } + + void setConfig(const uint8_t functionId, const FunctionConfig& config) { + configs[functionId] = config; + } + + std::vector getFunctionIds() const { + std::vector functionIds; + for (const auto& [funcId, _] : configs) { + functionIds.push_back(funcId); + } + return functionIds; + } + + private: + std::unordered_map configs; +}; + /** * @brief Configuration settings for the AsyncFunctionExecutor. * isProducer: If true, the instance acts as a producer (sending requests). @@ -27,27 +56,11 @@ struct FunctionConfig { struct Settings { const bool isProducer = true; const std::chrono::nanoseconds defaultTimeout = std::chrono::nanoseconds(0); - std::unordered_map functionConfigs; + FunctionConfigs functionConfigs; - Settings(bool isProducer, std::chrono::nanoseconds defaultTimeout, std::string_view funcConfs = "") - : isProducer(isProducer), defaultTimeout(defaultTimeout) { - if (funcConfs.empty()) { - return; - } - - const auto configs = nlohmann::json::parse(funcConfs, nullptr, false); - for(const auto& [key, value] : configs.items()) { - try { - uint8_t funcId = static_cast(std::stoi(key)); - FunctionConfig funcConfig { - .timeout = std::chrono::nanoseconds(value["timeout"].get()) - }; - functionConfigs[funcId] = funcConfig; - } catch (const std::exception &e) { - std::cerr << "Error parsing function configuration for key " << key << ": " << e.what() << std::endl; - } - } - } + Settings(const bool isProducer, const std::chrono::nanoseconds defaultTimeout, + const FunctionConfigs& functionConfigs = {}) + : isProducer(isProducer), defaultTimeout(defaultTimeout), functionConfigs(functionConfigs) {} }; } diff --git a/source/bringauto/async_function_execution/AeronDriver.cpp b/source/bringauto/async_function_execution/AeronDriver.cpp index 4ab869f..e622c76 100644 --- a/source/bringauto/async_function_execution/AeronDriver.cpp +++ b/source/bringauto/async_function_execution/AeronDriver.cpp @@ -1,6 +1,6 @@ #include -#include +#include @@ -26,7 +26,7 @@ AeronDriver::AeronDriver() { signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); aeron_driver_context_init(&driverContext_); - aeron_driver_context_set_driver_termination_hook(driverContext_, terminationHook, NULL); + aeron_driver_context_set_driver_termination_hook(driverContext_, terminationHook, nullptr); driverContext_->agent_on_start_func_delegate = driverContext_->agent_on_start_func; driverContext_->agent_on_start_state_delegate = driverContext_->agent_on_start_state; aeron_driver_context_set_agent_on_start_function(driverContext_, aeron_set_thread_affinity_on_start, driverContext_); @@ -34,7 +34,7 @@ AeronDriver::AeronDriver() { } -void AeronDriver::run() { +void AeronDriver::run() const { aeron_driver_start(driver_, true); while (isRunning()) { aeron_driver_main_idle_strategy(driver_, aeron_driver_main_do_work(driver_)); diff --git a/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp b/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp new file mode 100644 index 0000000..96ea857 --- /dev/null +++ b/source/bringauto/async_function_execution/TimeoutIdleStrategy.cpp @@ -0,0 +1,69 @@ +#include + +#include + +#include + + + +namespace bringauto::async_function_execution { + +int TimeoutIdleStrategy::idle(const int workCount, std::chrono::nanoseconds timeout) { + if (workCount > 0) { + reset(); + return 0; + } + return idle(timeout); +} + +void TimeoutIdleStrategy::reset() { + parkPeriodNs_ = minParkPeriodNs_; + state_ = IdleState::NOT_IDLE; + startTime_ = std::chrono::steady_clock::time_point(); +} + +int TimeoutIdleStrategy::idle(std::chrono::nanoseconds timeout) { + auto timeoutNs = timeout != std::chrono::nanoseconds(0) ? timeout : timeoutNs_; + auto now = std::chrono::steady_clock::now(); + if (timeoutNs != std::chrono::nanoseconds(0)) { + if (startTime_ == std::chrono::steady_clock::time_point()) { + startTime_ = now; + } + if (std::chrono::duration_cast(now - startTime_) >= timeoutNs) { + return -1; + } + } + + switch(state_) { + case IdleState::NOT_IDLE: + state_ = IdleState::SPINNING; + lastStateSwitchTime_ = now; + break; + + case IdleState::SPINNING: + aeron::concurrent::atomic::cpu_pause(); + if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxSpinPeriodNs_) { + state_ = IdleState::YIELDING; + lastStateSwitchTime_ = now; + } + break; + + case IdleState::YIELDING: + std::this_thread::yield(); + if (std::chrono::duration_cast(now - lastStateSwitchTime_) >= maxYieldPeriodNs_) { + state_ = IdleState::PARKING; + parkPeriodNs_ = minParkPeriodNs_; + lastStateSwitchTime_ = now; + } + break; + + case IdleState::PARKING: + default: + std::this_thread::sleep_for(parkPeriodNs_); + parkPeriodNs_ = std::min(parkPeriodNs_ * 2, maxParkPeriodNs_); + break; + } + return 0; +} + +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index faf9a99..a825908 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,10 +1,10 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25) PROJECT(async-function-execution) -SET(CMAKE_CXX_STANDARD 20) +SET(CMAKE_CXX_STANDARD 23) -IF(CMAKE_BUILD_TYPE STREQUAL "Debug") - ADD_DEFINITIONS(-DDEBUG) +IF(NOT TARGET async-function-execution-shared) + MESSAGE(FATAL_ERROR "The async-function-execution-shared target was not found. Please build the tests as part of the async-function-execution-shared project.") ENDIF() # GTest static library is not compiled as pic diff --git a/test/README.md b/test/README.md index 71c3e88..864634a 100644 --- a/test/README.md +++ b/test/README.md @@ -6,9 +6,11 @@ ## Build +Tests are built as part of the main project. Don't use the CMakeLists file in the test folder. + ```bash mkdir -p _build_tests && cd _build_tests -cmake ../ -DCMLIB_DIR= -DBRINGAUTO_TESTS=ON -DCMAKE_PREFIX_PATH= +cmake ../ -DBRINGAUTO_TESTS=ON make ``` diff --git a/test/include/AsyncFunctionExecutorTests.hpp b/test/include/AsyncFunctionExecutorTests.hpp index 97cadc6..92ffc17 100644 --- a/test/include/AsyncFunctionExecutorTests.hpp +++ b/test/include/AsyncFunctionExecutorTests.hpp @@ -12,7 +12,7 @@ namespace baafe = bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -52,41 +52,47 @@ baafe::FunctionDefinition FunctionNoArgs { baafe::Arguments { } }; +baafe::FunctionDefinition FunctionWait { + baafe::FunctionId { 6 }, + baafe::Return { }, + baafe::Arguments { } +}; + baafe::AsyncFunctionExecutor executorProducer { baafe::Config { .isProducer = true, - .defaultTimeout = std::chrono::seconds(1) + .defaultTimeout = std::chrono::seconds(1), + .functionConfigurations = baafe::structures::FunctionConfigs { { + { FunctionAdd.id.value, { std::chrono::nanoseconds(1000000) } }, + { FunctionMultiply.id.value, { std::chrono::nanoseconds(2000000) } }, + { FunctionReturnSame.id.value, { std::chrono::nanoseconds(3000000) } }, + { FunctionReturnSameString.id.value, { std::chrono::nanoseconds(4000000) } }, + { FunctionNoArgs.id.value, { std::chrono::nanoseconds(5000000) } }, + { FunctionWait.id.value, { std::chrono::nanoseconds(6000000) } } + } } }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd, FunctionMultiply, FunctionReturnSame, FunctionReturnSameString, - FunctionNoArgs - } }, + FunctionNoArgs, + FunctionWait + }, std::make_unique() }; baafe::AsyncFunctionExecutor executorConsumer { baafe::Config { - .isProducer = false, - .functionConfigurations = R"( - { - "1": { "timeout": 1000000 }, - "2": { "timeout": 2000000 }, - "3": { "timeout": 3000000 }, - "4": { "timeout": 4000000 }, - "5": { "timeout": 5000000 } - } - )" + .isProducer = false }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd, FunctionMultiply, FunctionReturnSame, FunctionReturnSameString, FunctionNoArgs - } }, + }, std::make_unique() }; @@ -94,8 +100,9 @@ baafe::AsyncFunctionExecutor executorConsumer { class AsyncFunctionExecutorTests : public ::testing::Test { protected: static void SetUpTestSuite() { - executorProducer.connect(); - executorConsumer.connect(); + // Connect both executors with an arbitrary channel offset of 42 + executorProducer.connect(42); + executorConsumer.connect(42); } void SetUp() override {} diff --git a/test/include/MockClient.hpp b/test/include/MockClient.hpp index a7ae425..6b31eea 100644 --- a/test/include/MockClient.hpp +++ b/test/include/MockClient.hpp @@ -9,7 +9,7 @@ -class MockClient : public bringauto::async_function_execution::clients::ClientInterface { +class MockClient final : public bringauto::async_function_execution::clients::ClientInterface { public: MockClient() = default; ~MockClient() = default; @@ -20,14 +20,14 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn } int sendMessage(const uint32_t channelId, std::span &messageBytes) override { - if (channelId > 1000) { + if (channelId > 421000) { // Validate that this is a return message if (messageBytes.size() != 3 + sizeof(int)) { std::cerr << "Invalid return message size: " << messageBytes.size() << std::endl; return -1; // Error: Invalid return message size } - if (messageBytes[0] != static_cast(channelId - 1000) || messageBytes[1] != sizeof(int)) { + if (messageBytes[0] != static_cast(channelId - 421000) || messageBytes[1] != sizeof(int)) { std::cerr << "Invalid return message format." << std::endl; return -2; // Error: Invalid return message format } @@ -42,7 +42,7 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; } - uint8_t funcId = messageBytes[0]; + const uint8_t funcId = messageBytes[0]; if (funcId == 4) { // FunctionReturnSameString auto stringArgs = deserializeStringRequest(messageBytes); @@ -52,17 +52,24 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; } - auto args = deserializeIntRequest(messageBytes); + if (funcId == 6) { // FunctionWait + // 250 ms because the test waits for 200 ms before checking call in progress + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + serializeVoidResponse(funcId); + return 0; + } + + const auto args = deserializeIntRequest(messageBytes); switch (funcId) { case 1: // FunctionAdd if (args.size() == 3) { - int sum = args[0] + args[1] + args[2]; + const int sum = args[0] + args[1] + args[2]; serializeIntResponse(funcId, sum); } break; case 2: // FunctionMultiply if (args.size() == 3) { - int product = args[0] * args[1] * args[2]; + const int product = args[0] * args[1] * args[2]; serializeIntResponse(funcId, product); } break; @@ -77,44 +84,46 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn return 0; }; - std::span waitForMessage(const uint32_t channelId) override { - (void)channelId; + std::span waitForMessage(const uint32_t channelId, const std::chrono::nanoseconds timeout) override { + // Test if the timeout is correctly set for each function + EXPECT_EQ((channelId - 421000) * 1000000, timeout.count()); + if (messageBuffer_.empty()) { return {}; } - return std::span(messageBuffer_.data(), messageBuffer_.size()); + return {messageBuffer_.data(), messageBuffer_.size()}; } /// Will always return a message for FunctionAdd with arguments (10, 20, 30) std::span waitForAnyMessage() override { messageBuffer_.clear(); messageBuffer_.reserve(2 + 3 * (2 + sizeof(int))); // Function ID + Arg count + 3 args (size + data) - messageBuffer_.push_back(static_cast(1)); // Function ID - messageBuffer_.push_back(static_cast(3)); // Argument count + messageBuffer_.push_back(1); // Function ID + messageBuffer_.push_back(3); // Argument count int arg1 = 10; int arg2 = 20; int arg3 = 30; - messageBuffer_.push_back(static_cast(sizeof(int) & 0xFF)); - messageBuffer_.push_back(static_cast((sizeof(int) >> 8) & 0xFF)); + messageBuffer_.push_back(sizeof(int) & 0xFF); + messageBuffer_.push_back(sizeof(int) >> 8 & 0xFF); messageBuffer_.insert(messageBuffer_.end(), reinterpret_cast(&arg1), reinterpret_cast(&arg1) + sizeof(int)); - messageBuffer_.push_back(static_cast(sizeof(int) & 0xFF)); - messageBuffer_.push_back(static_cast((sizeof(int) >> 8) & 0xFF)); + messageBuffer_.push_back(sizeof(int) & 0xFF); + messageBuffer_.push_back(sizeof(int) >> 8 & 0xFF); messageBuffer_.insert(messageBuffer_.end(), reinterpret_cast(&arg2), reinterpret_cast(&arg2) + sizeof(int)); - messageBuffer_.push_back(static_cast(sizeof(int) & 0xFF)); - messageBuffer_.push_back(static_cast((sizeof(int) >> 8) & 0xFF)); + messageBuffer_.push_back(sizeof(int) & 0xFF); + messageBuffer_.push_back(sizeof(int) >> 8 & 0xFF); messageBuffer_.insert(messageBuffer_.end(), reinterpret_cast(&arg3), reinterpret_cast(&arg3) + sizeof(int)); return {messageBuffer_.data(), messageBuffer_.size()}; } private: /// Deserializes a request message into function ID and argument values the same way that AsyncFunctionExecutor does. - std::vector deserializeIntRequest(std::span &bytes) { + std::vector deserializeIntRequest(const std::span &bytes) { size_t pos = 1; - uint8_t argCount = bytes[pos++]; + const uint8_t argCount = bytes[pos++]; std::vector args; for (uint8_t i = 0; i < argCount; ++i) { - uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); + const uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); pos += 2; int argValue; std::memcpy(&argValue, bytes.data() + pos, sizeof(int)); @@ -125,13 +134,13 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn } /// Deserializes a request message with string arguments. - std::vector deserializeStringRequest(std::span &bytes) { + std::vector deserializeStringRequest(const std::span &bytes) { size_t pos = 1; - uint8_t argCount = bytes[pos++]; + const uint8_t argCount = bytes[pos++]; std::vector args; for (uint8_t i = 0; i < argCount; ++i) { - uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); + const uint16_t argSize = bytes[pos] | (static_cast(bytes[pos + 1]) << 8); pos += 2; std::string argValue(reinterpret_cast(bytes.data() + pos), argSize); args.push_back(argValue); @@ -141,29 +150,35 @@ class MockClient : public bringauto::async_function_execution::clients::ClientIn } /// Serializes a response message the same way that AsyncFunctionExecutor does. - void serializeIntResponse(uint8_t funcId, int returnValue) { + void serializeIntResponse(const uint8_t funcId, const int returnValue) { std::vector buffer; buffer.push_back(funcId); - buffer.push_back(static_cast(sizeof(int))); + buffer.push_back(sizeof(int)); buffer.resize(3 + sizeof(int)); std::memcpy(buffer.data() + 3, &returnValue, sizeof(int)); messageBuffer_ = buffer; } /// Serializes a response message with a string return value. - void serializeStringResponse(uint8_t funcId, const std::string &data) { + void serializeStringResponse(const uint8_t funcId, const std::string &data) { std::vector buffer; buffer.push_back(funcId); if (data.size() > 65535) { throw std::invalid_argument("Data too large to serialize in MockClient"); } - uint16_t size = static_cast(data.size()); + const auto size = static_cast(data.size()); buffer.push_back(static_cast(size & 0xFF)); - buffer.push_back(static_cast((size >> 8) & 0xFF)); + buffer.push_back(static_cast(size >> 8 & 0xFF)); buffer.insert(buffer.end(), data.begin(), data.end()); messageBuffer_ = buffer; } + /// Serializes a response message for a void return type. + void serializeVoidResponse(const uint8_t funcId) { + std::vector buffer; + buffer.push_back(funcId); + messageBuffer_ = buffer; + } std::vector messageBuffer_; }; diff --git a/test/integration_tests/main_segfault_consumer.cpp b/test/integration_tests/main_segfault_consumer.cpp index 8b5ec35..40f7e1b 100644 --- a/test/integration_tests/main_segfault_consumer.cpp +++ b/test/integration_tests/main_segfault_consumer.cpp @@ -9,7 +9,7 @@ namespace baafe = bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -29,14 +29,18 @@ baafe::AsyncFunctionExecutor executorConsumer { baafe::Config { .isProducer = false }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { Function - } } + } }; int main() { - executorConsumer.connect(); + if (executorConsumer.connect() != 0) { + std::cerr << "Consumer: Failed to connect to executor" << std::endl; + return 1; + } + std::cout << "Consumer connected." << std::endl; while (true) { @@ -57,4 +61,4 @@ int main() { } return 0; -} \ No newline at end of file +} diff --git a/test/integration_tests/main_segfault_producer.cpp b/test/integration_tests/main_segfault_producer.cpp index 0cdfc1e..bab7c82 100644 --- a/test/integration_tests/main_segfault_producer.cpp +++ b/test/integration_tests/main_segfault_producer.cpp @@ -9,7 +9,8 @@ namespace baafe = bringauto::async_function_execution; struct SerializableString final { std::string value {}; SerializableString() = default; - SerializableString(std::string str) : value(std::move(str)) {} + + explicit SerializableString(std::string str) : value(std::move(str)) {} std::span serialize() const { return std::span {reinterpret_cast(value.data()), value.size()}; @@ -30,9 +31,9 @@ baafe::AsyncFunctionExecutor executorProducer { .isProducer = true, .defaultTimeout = std::chrono::seconds(1) }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { Function - } } + } }; int main() { @@ -50,15 +51,18 @@ int main() { return -1; } - executorProducer.connect(); - std::cout << "Producer connected." << std::endl; + if (executorProducer.connect() != 0) { + std::cerr << "Producer: Failed to connect to executor" << std::endl; + return 1; + } + std::cout << "Producer connected." << std::endl; std::cout << "Turn on the consumer and press Enter to continue..." << std::endl; std::cin.get(); // Producer calls Function - SerializableString ret = executorProducer.callFunc(Function, "Hello, World!"); - + SerializableString ret = executorProducer.callFunc(Function, SerializableString{"Hello, World!"}).value(); + // Short delay while a segfault is forced in the consumer std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -74,4 +78,4 @@ int main() { std::cout << "Aeron Driver has been stopped." << std::endl; std::cout << "Test completed." << std::endl; return 0; -} \ No newline at end of file +} diff --git a/test/integration_tests/main_simple_function.cpp b/test/integration_tests/main_simple_function.cpp index 053d0ce..1570372 100644 --- a/test/integration_tests/main_simple_function.cpp +++ b/test/integration_tests/main_simple_function.cpp @@ -17,18 +17,18 @@ baafe::AsyncFunctionExecutor executorProducer { .isProducer = true, .defaultTimeout = std::chrono::seconds(1) }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd - } } + } }; baafe::AsyncFunctionExecutor executorConsumer { baafe::Config { .isProducer = false }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd - } } + } }; void consumerLoop() { @@ -62,8 +62,14 @@ int main() { return -1; } - executorProducer.connect(); - executorConsumer.connect(); + if (executorProducer.connect() != 0) { + std::cerr << "Producer: Failed to connect to executor" << std::endl; + return 1; + } + if (executorConsumer.connect() != 0) { + std::cerr << "Consumer: Failed to connect to executor" << std::endl; + return 1; + } std::cout << "Both producer and consumer connected." << std::endl; // Start consumer loop in a separate thread @@ -71,7 +77,7 @@ int main() { std::this_thread::sleep_for(std::chrono::seconds(1)); // Give consumer a moment to start // Producer calls FunctionAdd - int sum = executorProducer.callFunc(FunctionAdd, 10, 20, 30); + int sum = executorProducer.callFunc(FunctionAdd, 10, 20, 30).value(); std::cout << "Producer: FunctionAdd(10, 20, 30) returned: " << sum << std::endl; if (sum != 60) { std::cerr << "Unexpected sum result!" << std::endl; @@ -88,4 +94,4 @@ int main() { std::cout << "Aeron Driver has been stopped." << std::endl; std::cout << "Test completed." << std::endl; return 0; -} \ No newline at end of file +} diff --git a/test/source/AsyncFunctionExecutorTests.cpp b/test/source/AsyncFunctionExecutorTests.cpp index 822069d..f8213c5 100644 --- a/test/source/AsyncFunctionExecutorTests.cpp +++ b/test/source/AsyncFunctionExecutorTests.cpp @@ -6,11 +6,11 @@ * @brief Tests calling different functions and receiving correct return values. */ TEST_F(AsyncFunctionExecutorTests, CallDifferentFunctions) { - auto result = executorProducer.callFunc(FunctionAdd, 1, 2, 3); + auto result = executorProducer.callFunc(FunctionAdd, 1, 2, 3).value(); ASSERT_EQ(result, 6); - result = executorProducer.callFunc(FunctionMultiply, 2, 3, 4); + result = executorProducer.callFunc(FunctionMultiply, 2, 3, 4).value(); ASSERT_EQ(result, 24); - result = executorProducer.callFunc(FunctionReturnSame, 42); + result = executorProducer.callFunc(FunctionReturnSame, 42).value(); ASSERT_EQ(result, 42); } @@ -19,7 +19,7 @@ TEST_F(AsyncFunctionExecutorTests, CallDifferentFunctions) { * @brief Tests calling a function with a serializable string argument and return value. */ TEST_F(AsyncFunctionExecutorTests, CallFunctionWithSerializableString) { - auto result = executorProducer.callFunc(FunctionReturnSameString, SerializableString{"Hello, World!"}); + auto result = executorProducer.callFunc(FunctionReturnSameString, SerializableString{"Hello, World!"}).value(); ASSERT_EQ(result.value, "Hello, World!"); } @@ -73,8 +73,8 @@ TEST_F(AsyncFunctionExecutorTests, GetFunctionArgsInvalidData) { * @brief Tests checks for producer/consumer mode restrictions and error handling. */ TEST_F(AsyncFunctionExecutorTests, CallInvalidFunctionsProducerConsumer) { - // Consumer using callFunc should throw - ASSERT_THROW(executorConsumer.callFunc(FunctionAdd, 1, 2, 3), std::runtime_error); + // Consumer using callFunc returns InvalidExecutorType error + ASSERT_EQ(executorConsumer.callFunc(FunctionAdd, 1, 2, 3).error(), baafe::CallError::InvalidExecutorType); // Producer using pollFunction should return empty tuple auto [funcId, argBytes] = executorProducer.pollFunction(); @@ -100,7 +100,7 @@ TEST_F(AsyncFunctionExecutorTests, UndefinedFunction) { baafe::Arguments { int {}, int {} } }; - ASSERT_THROW(executorProducer.callFunc(FunctionUndefined, 1, 2), std::runtime_error); + ASSERT_EQ(executorProducer.callFunc(FunctionUndefined, 1, 2).error(), baafe::CallError::FunctionIdNotDefined); ASSERT_THROW(executorConsumer.getFunctionArgs(FunctionUndefined, std::span {}), std::runtime_error); int ret = executorConsumer.sendReturnMessage(FunctionUndefined.id, 42); ASSERT_EQ(ret, -1); @@ -115,16 +115,69 @@ TEST_F(AsyncFunctionExecutorTests, ConfigForUndefinedFunction) { ASSERT_THROW(baafe::AsyncFunctionExecutor( baafe::Config { .isProducer = true, - .functionConfigurations = R"( - { - "99": { "timeout": 1000000 } - } - )" + .functionConfigurations = baafe::structures::FunctionConfigs { { + { 99, { std::chrono::nanoseconds(1000000) } } + } } }, - baafe::FunctionList { std::tuple{ + baafe::FunctionList { FunctionAdd, FunctionMultiply - } }, + }, std::make_unique() ), std::runtime_error); } + + +/** + * @brief Tests calling a function with argument size over 2^16 bytes. + */ +TEST_F(AsyncFunctionExecutorTests, ArgumentTooLarge) { + // Create a very large string argument + std::string largeString(70000, 'A'); + ASSERT_THROW(executorProducer.callFunc(FunctionReturnSameString, SerializableString{largeString}), std::invalid_argument); +} + + +/** + * @brief Tests the behavior when a function call is already in progress. + */ +TEST_F(AsyncFunctionExecutorTests, FunctionCallInProgress) { + // Start a thread that calls FunctionWait, which does not expect a response + std::thread waitThread([this]() { + executorProducer.callFunc(FunctionWait); + }); + // Give the thread a moment to start and set the callInProgress flag + // 200 ms is chosen to account for worst case scenario thread creation time plus some extra time for function execution + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Now, calling FunctionWait again should return FunctionCallInProgress error + EXPECT_EQ(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); + waitThread.join(); + ASSERT_NE(executorProducer.callFunc(FunctionWait).error(), baafe::CallError::FunctionCallInProgress); +} + +/** + * @brief Tests the connect function with different channel offsets. + */ +TEST(AsyncFunctionExecutorTestsStandalone, ConnectTest) { + baafe::AsyncFunctionExecutor executor1 { + baafe::Config { + .isProducer = true + }, + baafe::FunctionList { }, + std::make_unique() + }; + baafe::AsyncFunctionExecutor executor2 { + baafe::Config { + .isProducer = false + }, + baafe::FunctionList { }, + std::make_unique() + }; + + // Try a valid channel offset + int ret = executor1.connect(5); + ASSERT_EQ(ret, 0); + // Try an invalid channel offset + ret = executor2.connect(40000000); + ASSERT_EQ(ret, -1); +}