22
33#include < bringauto/async_function_execution/clients/AeronClient.hpp>
44#include < bringauto/async_function_execution/TimeoutIdleStrategy.hpp>
5+ #include < bringauto/async_function_execution/structures/Settings.hpp>
56
67#include < utility>
78#include < stdexcept>
@@ -18,7 +19,7 @@ namespace bringauto::async_function_execution {
1819struct Config {
1920 bool isProducer = true ;
2021 std::chrono::nanoseconds defaultTimeout = std::chrono::nanoseconds(0 );
21- // TODO per function config map (for now only config will be timeout)
22+ std::string_view functionConfigurations = " " ;
2223};
2324
2425
@@ -27,7 +28,7 @@ struct Config {
2728 * Supported range is 0-255
2829 */
2930struct FunctionId {
30- uint8_t value;
31+ const uint8_t value;
3132};
3233
3334
@@ -45,7 +46,7 @@ concept HasSerialize = requires(const T& t) {
4546 */
4647template <typename T>
4748struct Return {
48- T value;
49+ const T value;
4950 constexpr Return (T &&val) : value(std::forward<T>(val)) {}
5051};
5152
@@ -64,7 +65,7 @@ struct Return<void> {
6465 */
6566template <typename ... Args>
6667struct Arguments {
67- std::tuple<Args...> values;
68+ const std::tuple<Args...> values;
6869 constexpr Arguments (Args &&...args) : values{std::forward<Args>(args)...} {}
6970};
7071
@@ -74,9 +75,9 @@ struct Arguments {
7475 */
7576template <typename Ret, typename ... Args>
7677struct FunctionDefinition {
77- FunctionId id;
78- Return<Ret> returnType;
79- Arguments<Args...> argumentTypes;
78+ const FunctionId id;
79+ const Return<Ret> returnType;
80+ const Arguments<Args...> argumentTypes;
8081};
8182
8283
@@ -85,7 +86,7 @@ struct FunctionDefinition {
8586 */
8687template <typename ... Funcs>
8788struct FunctionList {
88- std::tuple<Funcs...> functions;
89+ const std::tuple<Funcs...> functions;
8990 FunctionList (Funcs&&... funcs) : functions(std::forward<Funcs>(funcs)...) {}
9091};
9192
@@ -106,15 +107,21 @@ class AsyncFunctionExecutor {
106107 AsyncFunctionExecutor (Config config,
107108 const FunctionList<Funcs...> &functions,
108109 std::unique_ptr<clients::ClientInterface> client = nullptr )
109- : client_(nullptr ), config_ (config), functions_(functions) {
110+ : client_(nullptr ), settings_ (config.isProducer, config.defaultTimeout, config.functionConfigurations ), functions_(functions) {
110111 // Default client if none is provided
111112 if (client) {
112113 client_ = std::move (client);
113114 } else {
114115 client_ = std::make_unique<clients::AeronClient<TimeoutIdleStrategy>>(
115- " aeron:ipc " , TimeoutIdleStrategy (config .defaultTimeout )
116+ DEFAULT_AERON_CONNECTION , TimeoutIdleStrategy (settings_ .defaultTimeout )
116117 );
117118 }
119+
120+ for (const auto & [funcId, _] : settings_.functionConfigs ) {
121+ if (!isFunctionDefined (FunctionId{funcId})) {
122+ throw std::runtime_error (" Warning: Function ID " + std::to_string (static_cast <int >(funcId)) + " in configuration is not defined in FunctionList." );
123+ }
124+ }
118125 };
119126
120127 ~AsyncFunctionExecutor () = default ;
@@ -129,11 +136,11 @@ class AsyncFunctionExecutor {
129136 std::vector<uint32_t > fromProducer;
130137
131138 std::apply ([&](auto &&... funcDefs) {
132- (toProducer.push_back (funcDefs.id .value + 1000 ), ...);
139+ (toProducer.push_back (funcDefs.id .value + MESSAGE_RETURN_CHANNEL_OFFSET ), ...);
133140 (fromProducer.push_back (funcDefs.id .value ), ...);
134141 }, std::get<0 >(functions_.functions ));
135142
136- if (config_ .isProducer ) {
143+ if (settings_ .isProducer ) {
137144 client_->connect (toProducer, fromProducer);
138145 } else {
139146 client_->connect (fromProducer, toProducer);
@@ -152,17 +159,20 @@ class AsyncFunctionExecutor {
152159 */
153160 template <typename Ret, typename ... FArgs, typename ... CallArgs>
154161 Ret callFunc (const FunctionDefinition<Ret, FArgs...> &function, CallArgs&&... args) {
155- if (!config_ .isProducer ) {
162+ if (!settings_ .isProducer ) {
156163 throw std::runtime_error (" Cannot call function in consumer mode" );
157164 }
158165 if (sizeof ...(CallArgs) < sizeof ...(FArgs)) {
159166 throw std::invalid_argument (" Argument count mismatch" );
160167 }
168+ if (!isFunctionDefined (function.id )) {
169+ throw std::runtime_error (" Function ID not defined" );
170+ }
161171
162172 auto messageBytes = serializeArgs (function.id , args...);
163173 client_->sendMessage (function.id .value , messageBytes);
164174
165- auto responseBytes = client_->waitForMessage (function.id .value + 1000 );
175+ auto responseBytes = client_->waitForMessage (function.id .value + MESSAGE_RETURN_CHANNEL_OFFSET );
166176 if (responseBytes.empty ()) {
167177 throw std::runtime_error (" No response received or timeout" );
168178 }
@@ -184,14 +194,14 @@ class AsyncFunctionExecutor {
184194 * @return A tuple containing the FunctionId and a span of argument bytes. Returns an empty tuple on error.
185195 */
186196 std::tuple<FunctionId, std::span<const uint8_t >> pollFunction () {
187- if (config_ .isProducer ) {
197+ if (settings_ .isProducer ) {
188198 std::cerr << " Cannot start polling in producer mode." << std::endl;
189- return {} ; // Error: Cannot start polling in producer mode
199+ return std::make_tuple (FunctionId{}, std::span< const uint8_t >{}) ; // Error: Cannot start polling in producer mode
190200 }
191201
192202 auto requestBytes = client_->waitForAnyMessage ();
193203 if (requestBytes.empty ()) {
194- return {} ; // Error: No message received or timeout
204+ return std::make_tuple (FunctionId{}, std::span< const uint8_t >{}) ; // Error: No message received or timeout
195205 }
196206
197207 auto [funcId, argBytes] = deserializeRequest (requestBytes);
@@ -209,10 +219,14 @@ class AsyncFunctionExecutor {
209219 */
210220 template <typename Ret, typename ... Args>
211221 auto getFunctionArgs (const FunctionDefinition<Ret, Args...> &function, const std::span<const uint8_t > &argBytes) {
212- if (config_ .isProducer ) {
222+ if (settings_ .isProducer ) {
213223 throw std::runtime_error (" Cannot get function arguments in producer mode" );
214224 }
215225
226+ if (!isFunctionDefined (function.id )) {
227+ throw std::runtime_error (" Function ID not defined" );
228+ }
229+
216230 if (argBytes.size () < 1 ) {
217231 throw std::invalid_argument (" Not enough data to read argument count" );
218232 }
@@ -227,7 +241,8 @@ class AsyncFunctionExecutor {
227241 std::tuple<Args...> args;
228242 auto extractArg = [&](auto &arg) {
229243 if (pos >= argBytes.size ()) throw std::invalid_argument (" Unexpected end of data while reading argument size" );
230- uint8_t len = argBytes[pos++];
244+ uint16_t len = argBytes[pos] | (static_cast <uint16_t >(argBytes[pos + 1 ]) << 8 );
245+ pos += 2 ;
231246 if (pos + len > argBytes.size ()) throw std::invalid_argument (" Unexpected end of data while reading argument content" );
232247
233248 if constexpr (HasSerialize<decltype (arg)>) {
@@ -257,21 +272,34 @@ class AsyncFunctionExecutor {
257272 */
258273 template <typename T>
259274 int sendReturnMessage (const FunctionId &functionId, const T &returnValue) {
260- if (config_.isProducer ) {
275+ if (!isFunctionDefined (functionId)) {
276+ std::cerr << " Function ID not defined." << std::endl;
277+ return -1 ; // Error: Function ID not defined
278+ }
279+
280+ if (settings_.isProducer ) {
261281 std::cerr << " Cannot send return message in producer mode." << std::endl;
262282 return -1 ; // Error: Cannot send return message in producer mode
263283 }
264284
265285 auto messageBytes = serializeReturn (functionId, returnValue);
266- return client_->sendMessage (functionId.value + 1000 , messageBytes);
286+ return client_->sendMessage (functionId.value + MESSAGE_RETURN_CHANNEL_OFFSET , messageBytes);
267287 }
268288
269289private:
290+ bool isFunctionDefined (const FunctionId &funcId) {
291+ bool found = false ;
292+ std::apply ([&](auto &&... funcDefs) {
293+ ((funcDefs.id .value == funcId.value ? found = true : false ), ...);
294+ }, std::get<0 >(functions_.functions ));
295+ return found;
296+ }
297+
270298 template <typename ... Args>
271299 std::span<const uint8_t > serializeArgs (const FunctionId &funcId, const Args&... args) {
272300 serializationBuffer_.clear ();
273301 std::size_t totalSize = 2 ; // Function ID + Argument count
274- ((totalSize += 1 + sizeof (args)), ...); // Each argument: size byte + data
302+ ((totalSize += 2 + sizeof (args)), ...); // Each argument: size byte + data
275303 serializationBuffer_.reserve (totalSize);
276304 serializationBuffer_.push_back (funcId.value );
277305 serializationBuffer_.push_back (static_cast <uint8_t >(sizeof ...(Args)));
@@ -283,7 +311,7 @@ class AsyncFunctionExecutor {
283311 template <typename T>
284312 std::span<const uint8_t > serializeReturn (const FunctionId &funcId, const T &returnValue) {
285313 serializationBuffer_.clear ();
286- std::size_t totalSize = 2 + sizeof (returnValue);
314+ std::size_t totalSize = 3 + sizeof (returnValue);
287315 serializationBuffer_.reserve (totalSize);
288316 serializationBuffer_.push_back (funcId.value );
289317 appendArg (serializationBuffer_, returnValue);
@@ -295,14 +323,18 @@ class AsyncFunctionExecutor {
295323 void appendArg (std::vector<uint8_t >& buffer, const T& arg) {
296324 if constexpr (HasSerialize<T>) {
297325 auto bytes = arg.serialize ();
298- if (bytes.size () > 255 ) {
326+ if (bytes.size () > MAX_ARGUMENT_SIZE ) {
299327 throw std::invalid_argument (" Serialized data too large" );
300328 }
301- buffer.push_back (static_cast <uint8_t >(bytes.size ()));
329+ uint16_t size = static_cast <uint16_t >(bytes.size ());
330+ buffer.push_back (static_cast <uint8_t >(size & 0xFF ));
331+ buffer.push_back (static_cast <uint8_t >((size >> 8 ) & 0xFF ));
302332 buffer.insert (buffer.end (), bytes.begin (), bytes.end ());
303333 } else {
304334 static_assert (std::is_trivially_copyable_v<T>, " Argument type must be trivially copyable" );
305- buffer.push_back (static_cast <uint8_t >(sizeof (arg)));
335+ uint16_t size = static_cast <uint16_t >(sizeof (arg));
336+ buffer.push_back (static_cast <uint8_t >(size & 0xFF ));
337+ buffer.push_back (static_cast <uint8_t >((size >> 8 ) & 0xFF ));
306338 buffer.insert (buffer.end (), reinterpret_cast <const uint8_t *>(&arg), reinterpret_cast <const uint8_t *>(&arg) + sizeof (T));
307339 }
308340 }
@@ -316,9 +348,9 @@ class AsyncFunctionExecutor {
316348
317349 T value;
318350 if constexpr (HasSerialize<T>) {
319- value.deserialize (std::span {bytes.data () + 2 , bytes.size () - 2 });
351+ value.deserialize (std::span {bytes.data () + 3 , bytes.size () - 3 });
320352 } else {
321- std::memcpy (&value, bytes.data () + 2 , sizeof (T));
353+ std::memcpy (&value, bytes.data () + 3 , sizeof (T));
322354 }
323355 return value;
324356 }
@@ -337,7 +369,7 @@ class AsyncFunctionExecutor {
337369 // / Buffer used for serialization of messages.
338370 mutable std::vector<uint8_t > serializationBuffer_;
339371 std::unique_ptr<clients::ClientInterface> client_;
340- Config config_ ;
372+ structures::Settings settings_ ;
341373 FunctionList<Funcs...> functions_;
342374};
343375
0 commit comments