From 5adc5dcf153db4140bb03da1b13da355bcc2571a Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 21 Nov 2025 16:52:26 +0100 Subject: [PATCH 01/14] bump version --- version/version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.txt b/version/version.txt index 227cea21..2e21232a 100644 --- a/version/version.txt +++ b/version/version.txt @@ -1 +1 @@ -2.0.0 +2.0.1-alpha From 7fd196eb5ee4cb5284dff0ed5fa2e9480758188a Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:34:28 +0100 Subject: [PATCH 02/14] add arith ops --- include/libstored/types.h | 66 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/include/libstored/types.h b/include/libstored/types.h index 61def64c..ce44e77b 100644 --- a/include/libstored/types.h +++ b/include/libstored/types.h @@ -54,8 +54,8 @@ struct Type { Uint32 = FlagFixed | FlagInt | 3U, Int64 = FlagFixed | FlagInt | FlagSigned | 7U, Uint64 = FlagFixed | FlagInt | 7U, - Int = FlagFixed | FlagInt | (sizeof(int) - 1), - Uint = FlagFixed | (sizeof(int) - 1), + Int = FlagFixed | FlagInt | FlagSigned | (sizeof(int) - 1), + Uint = FlagFixed | FlagInt | (sizeof(int) - 1), // things with fixed length Float = FlagFixed | FlagSigned | 3U, @@ -96,6 +96,15 @@ struct Type { return isFixed(t) && (t & FlagInt); } + /*! + * \brief Checks if the given type is a floating point number, or is a function with + * such an argument. + */ + static constexpr bool isFloat(type t) noexcept + { + return isFixed(t) && isSigned(t) && !isInt(t); + } + /*! * \brief Checks if the given type is signed number, or is a function * with such an argument. @@ -341,6 +350,47 @@ struct fromType { (unsigned int)T&(unsigned int)~Type::FlagFunction)>::type type; }; +# define STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, op) \ + Class& operator op##=(type v) noexcept \ + { \ + stored_assert(Type::isInt(toType::type) || Type::isFloat(toType::type)); \ + set(get() op v); \ + return *this; \ + } + +# define STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, op) \ + Class& operator op##=(type v) noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + set(get() op v); \ + return *this; \ + } + +# define STORED_VARIABLE_MEMBER_ARITH_OPS(Class, T) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, +) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, -) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, *) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, /) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, %) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, &) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, |) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, ^) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, <<) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, >>) \ + \ + Class& operator++() noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + return *this += 1; \ + } \ + \ + Class& operator--() noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + return *this -= 1; \ + } + + template class Variant; @@ -520,6 +570,8 @@ class Variable { return sizeof(type); } + STORED_VARIABLE_MEMBER_ARITH_OPS(Variable, T) + protected: /*! * \brief Returns the buffer this Variable points to. @@ -537,7 +589,7 @@ class Variable { private: /*! \brief The buffer of this Variable. */ type* m_buffer; -}; +}; // namespace stored /*! * \brief A typed variable in a store, with hook support. @@ -775,6 +827,8 @@ class Variable : public Variable { container().hookExitRO(toType::type, &this->buffer(), sizeof(type)); } + STORED_VARIABLE_MEMBER_ARITH_OPS(Variable, T) + private: /*! \brief The container of this Variable. */ Container* m_container; @@ -2124,6 +2178,8 @@ class StoreVariable { { return sizeof(type); } + + STORED_VARIABLE_MEMBER_ARITH_OPS(StoreVariable, T) }; /*! @@ -2402,6 +2458,10 @@ class StoreVariantF { }; } // namespace impl +# undef STORED_VARIABLE_MEMBER_ARITH_OPS +# undef STORED_VARIABLE_MEMBER_ARITH_BITOP +# undef STORED_VARIABLE_MEMBER_ARITH_OP + } // namespace stored #endif // __cplusplus #endif // LIBSTORED_TYPES_H From 0cbeb9f284ebbd7452a2cd59fb335726c35f9e66 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:36:14 +0100 Subject: [PATCH 03/14] add lossy sync example --- examples/8_sync/CMakeLists.txt | 5 +- examples/8_sync/main.cpp | 4 +- examples/CMakeLists.txt | 37 +- examples/lib/CMakeLists.txt | 6 + .../{8_sync => lib/include}/getopt_mini.h | 12 +- examples/{8_sync => lib/src}/getopt_mini.cpp | 10 +- examples/lossy_sync/.gitignore | 5 + examples/lossy_sync/CMakeLists.txt | 12 + examples/lossy_sync/ExampleSync.st | 7 + examples/lossy_sync/main.cpp | 347 ++++++++++++++++++ include/libstored/protocol.h | 34 +- python/.gitignore | 3 +- src/protocol.cpp | 46 ++- src/synchronizer.cpp | 4 +- 14 files changed, 502 insertions(+), 30 deletions(-) create mode 100644 examples/lib/CMakeLists.txt rename examples/{8_sync => lib/include}/getopt_mini.h (62%) rename examples/{8_sync => lib/src}/getopt_mini.cpp (84%) create mode 100644 examples/lossy_sync/.gitignore create mode 100644 examples/lossy_sync/CMakeLists.txt create mode 100644 examples/lossy_sync/ExampleSync.st create mode 100644 examples/lossy_sync/main.cpp diff --git a/examples/8_sync/CMakeLists.txt b/examples/8_sync/CMakeLists.txt index 99fc428b..52216e9b 100644 --- a/examples/8_sync/CMakeLists.txt +++ b/examples/8_sync/CMakeLists.txt @@ -1,9 +1,10 @@ -# SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers +# SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers # # SPDX-License-Identifier: CC0-1.0 if(LIBSTORED_HAVE_LIBZMQ) - add_executable(8_sync main.cpp getopt_mini.cpp) + add_executable(8_sync main.cpp) + target_link_libraries(8_sync PRIVATE example_lib) libstored_generate(TARGET 8_sync STORES ExampleSync1.st ExampleSync2.st) install(TARGETS 8_sync RUNTIME DESTINATION bin) add_launch_json(8_sync) diff --git a/examples/8_sync/main.cpp b/examples/8_sync/main.cpp index 0d930819..afbb1d1e 100644 --- a/examples/8_sync/main.cpp +++ b/examples/8_sync/main.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: CC0-1.0 @@ -25,7 +25,7 @@ #include #include -#include "getopt_mini.h" +#include static stored::Synchronizer synchronizer; diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f4707b63..66be2e13 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -36,6 +36,28 @@ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT APPLE) set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gc-sections") endif() +if(NOT CMAKE_CXX_STANDARD) + foreach( + cxx + cxx_std_26 + cxx_std_23 + cxx_std_20 + cxx_std_17 + cxx_std_14 + cxx_std_11 + cxx_std_98 + ) + list(FIND CMAKE_CXX_COMPILE_FEATURES "${cxx}" _cxx) + if(${_cxx} GREATER -1) + string(REPLACE "cxx_std_" "" _std "${cxx}") + set(CMAKE_CXX_STANDARD "${_std}") + break() + endif() + endforeach() +endif() + +add_subdirectory(lib) + add_subdirectory(1_hello) add_subdirectory(2_basic) add_subdirectory(3_scope) @@ -61,6 +83,10 @@ if(LIBSTORED_HAVE_LIBZMQ) add_subdirectory(components) add_subdirectory(control) add_subdirectory(zmqserver) + + if(CMAKE_CXX_STANDARD LESS 98 AND CMAKE_CXX_STANDARD GREATER_EQUAL 11) + add_subdirectory(lossy_sync) + endif() endif() if(LIBSTORED_HAVE_ZTH AND LIBSTORED_HAVE_LIBZMQ) @@ -71,13 +97,6 @@ if(LIBSTORED_PYLIBSTORED AND TARGET pylibstored-install) add_subdirectory(int_pip) endif() -list(FIND CMAKE_CXX_COMPILE_FEATURES "cxx_std_17" _cxx17) -if(${_cxx17} GREATER -1) - if(NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 17) - endif() - - if(CMAKE_CXX_STANDARD GREATER_EQUAL 17 AND CMAKE_CXX_STANDARD LESS 98) - add_subdirectory(pipes) - endif() +if(CMAKE_CXX_STANDARD GREATER_EQUAL 17 AND CMAKE_CXX_STANDARD LESS 98) + add_subdirectory(pipes) endif() diff --git a/examples/lib/CMakeLists.txt b/examples/lib/CMakeLists.txt new file mode 100644 index 00000000..4760e42b --- /dev/null +++ b/examples/lib/CMakeLists.txt @@ -0,0 +1,6 @@ +# SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers +# +# SPDX-License-Identifier: CC0-1.0 + +add_library(example_lib STATIC include/getopt_mini.h src/getopt_mini.cpp) +target_include_directories(example_lib PUBLIC include) diff --git a/examples/8_sync/getopt_mini.h b/examples/lib/include/getopt_mini.h similarity index 62% rename from examples/8_sync/getopt_mini.h rename to examples/lib/include/getopt_mini.h index cb85dc53..3e0c3f4e 100644 --- a/examples/8_sync/getopt_mini.h +++ b/examples/lib/include/getopt_mini.h @@ -1,15 +1,13 @@ #ifndef GETOPT_MINI_H #define GETOPT_MINI_H -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MIT -#include - -#ifdef STORED_OS_POSIX +#if defined(__linux__) || defined(__APPLE__) // Just use glibc's one. -# include -#else // STORED_OS_POSIX +# include +#else // !POSIX extern int opterr; extern int optopt; @@ -19,5 +17,5 @@ extern char* optarg; // flawfinder: ignore int getopt(int argc, char* const* argv, char const* options); -#endif // !STORED_OS_POSIX +#endif // !POSIX #endif // GETOPT_MINI_H diff --git a/examples/8_sync/getopt_mini.cpp b/examples/lib/src/getopt_mini.cpp similarity index 84% rename from examples/8_sync/getopt_mini.cpp rename to examples/lib/src/getopt_mini.cpp index b4ed3f4c..3afc64ca 100644 --- a/examples/8_sync/getopt_mini.cpp +++ b/examples/lib/src/getopt_mini.cpp @@ -1,12 +1,12 @@ -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MIT -#include "getopt_mini.h" +#include -#ifdef STORED_COMPILER_MSVC +#if !defined(__linux__) && !defined(__APPLE__) -# include +# include int opterr = 1; int optopt = 0; @@ -61,4 +61,4 @@ int getopt(int argc, char* const* argv, char const* options) return optopt; } -#endif // STORED_COMPILER_MSVC +#endif // !POSIX diff --git a/examples/lossy_sync/.gitignore b/examples/lossy_sync/.gitignore new file mode 100644 index 00000000..e5c3e6bb --- /dev/null +++ b/examples/lossy_sync/.gitignore @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers +# +# SPDX-License-Identifier: CC0-1.0 + +/libstored/ diff --git a/examples/lossy_sync/CMakeLists.txt b/examples/lossy_sync/CMakeLists.txt new file mode 100644 index 00000000..43041390 --- /dev/null +++ b/examples/lossy_sync/CMakeLists.txt @@ -0,0 +1,12 @@ +# SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers +# +# SPDX-License-Identifier: CC0-1.0 + +if(LIBSTORED_HAVE_LIBZMQ) + add_executable(lossy_sync main.cpp) + set_target_properties(lossy_sync PROPERTIES CXX_STANDARD 11 CXX_STANDARD_REQUIRED YES) + target_link_libraries(lossy_sync PRIVATE example_lib) + libstored_generate(TARGET lossy_sync STORES ExampleSync.st) + install(TARGETS lossy_sync RUNTIME DESTINATION bin) + add_launch_json(lossy_sync) +endif() diff --git a/examples/lossy_sync/ExampleSync.st b/examples/lossy_sync/ExampleSync.st new file mode 100644 index 00000000..42224ef4 --- /dev/null +++ b/examples/lossy_sync/ExampleSync.st @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers +// +// SPDX-License-Identifier: CC0-1.0 + +int32 restarted +int32 i +double d diff --git a/examples/lossy_sync/main.cpp b/examples/lossy_sync/main.cpp new file mode 100644 index 00000000..f5ad5633 --- /dev/null +++ b/examples/lossy_sync/main.cpp @@ -0,0 +1,347 @@ +// SPDX-FileCopyrightText: 2020-20255555 Jochem Rutgers +// +// SPDX-License-Identifier: CC0-1.0 + +/*! + * \file + * \brief Example with synchronization between server and client with a lossy + * channel. + */ + +#include "ExampleSync.h" + +#include +#include +#include +#include +#include +#include + +#include + +///////////////////////////////////////////////////////////////////////// +// The store +// + +class ExampleSync : public STORE_T(ExampleSync, stored::Synchronizable, stored::ExampleSyncBase) { + STORE_CLASS(ExampleSync, stored::Synchronizable, stored::ExampleSyncBase) +public: + ExampleSync() is_default +}; + + + +///////////////////////////////////////////////////////////////////////// +// Argument parsing and help +// + +static int parse_port(char const* str) +{ + char* endptr = nullptr; + long port = strtol(str, &endptr, 0); + if(*endptr || port <= 0 || port >= 0x10000) + throw std::invalid_argument{"Invalid port"}; + return (int)port; +} + +static void print_help(FILE* out, char const* progname) +{ + fprintf(out, "Usage: %s [-h] [-v] [-p ] {-s |-c }\n", progname); + fprintf(out, "where\n"); + fprintf(out, " -h Show this help message.\n"); + fprintf(out, " -s Server 0MQ endpoint for downstream sync.\n"); + fprintf(out, " -c Client 0MQ endpoint for upstream sync.\n"); + fprintf(out, " -p Set debugger's port. Default: %d\n", + stored::DebugZmqLayer::DefaultPort); + fprintf(out, " -v Verbose output of sync connections.\n"); +} + +struct Arguments { + bool verbose = false; + int debug_port = stored::DebugZmqLayer::DefaultPort; + int client_port = 0; + int server_port = 0; +}; + +class exit_now : public std::exception {}; + +static Arguments parse_arguments(int argc, char** argv) +{ + Arguments args; + + int c; + // flawfinder: ignore + while((c = getopt(argc, argv, "hs:c:p:v")) != -1) { + switch(c) { + case 'p': + try { + args.debug_port = parse_port(optarg); + } catch(std::invalid_argument&) { + fprintf(stderr, "Invalid debug port '%s'\n", optarg); + throw; + } + break; + case 'v': + args.verbose = true; + printf("Enable verbose output\n"); + break; + case 's': + try { + args.server_port = parse_port(optarg); + } catch(std::invalid_argument&) { + fprintf(stderr, "Invalid server port '%s'\n", optarg); + throw; + } + break; + case 'c': + try { + args.client_port = parse_port(optarg); + } catch(std::invalid_argument&) { + fprintf(stderr, "Invalid client port '%s'\n", optarg); + throw; + } + break; + case 'h': + print_help(stdout, argv[0]); + throw exit_now(); + default: + print_help(stderr, argv[0]); + throw std::invalid_argument{""}; + } + } + + if(args.client_port && args.server_port) { + fprintf(stderr, "Cannot be both client and server\n"); + throw std::invalid_argument{""}; + } + + if(!args.client_port && !args.server_port) { + fprintf(stderr, "Must be either client or server\n"); + throw std::invalid_argument{""}; + } + + return args; +} + + + +///////////////////////////////////////////////////////////////////////// +// The stacks +// + +/*! + * \brief ZeroMQ interface for the debugger. + */ +class DebugStack { + STORED_CLASS_NOCOPY(DebugStack) +public: + explicit DebugStack(ExampleSync& store, int port) + : m_debugger{"lossy_sync"} + , m_debugLayer{nullptr, port} + { + if((errno = m_debugLayer.lastError())) { + fprintf(stderr, "Cannot initialize ZMQ for debugging, got error %d; %s\n", + errno, zmq_strerror(errno)); + throw std::runtime_error{"ZMQ initialization failed"}; + } + + m_debugger.map(store); + m_debugLayer.wrap(m_debugger); + } + + stored::Pollable& pollable() + { + return m_pollable; + } + + void recv() + { + int res = m_debugLayer.recv(); + + switch(res) { + case 0: + case EAGAIN: + return; + default: + fprintf(stderr, "Debugger recv failed with error %d; %s\n", res, + zmq_strerror(res)); + break; + } + } + +private: + stored::Debugger m_debugger; + stored::DebugZmqLayer m_debugLayer; + stored::PollableZmqSocket m_pollable{m_debugLayer.socket(), stored::Pollable::PollIn}; +}; + +/*! + * \brief Lossy synchronization stack. + */ +class SyncStack { + STORED_CLASS_NOCOPY(SyncStack) +public: + explicit SyncStack( + ExampleSync& store, char const* endpoint, bool server, bool verbose, float ber = 0) + : m_syncLayer(nullptr, endpoint, server) + { + if((errno = m_syncLayer.lastError())) { + fprintf(stderr, "Cannot initialize ZMQ for sync, got error %d; %s\n", errno, + zmq_strerror(errno)); + throw std::runtime_error{"ZMQ initialization failed"}; + } + + // We don't want to do ARQ on large messages, so we segment them to some + // appropriate size. + wrap(32U); + // Perform retransmits. + m_arq = &wrap(); + // Check if we have communication at all. + m_idle = &wrap(); + // Do CRC checks. Do this below the ARQ, such that the ARQ sees no or + // correct messages. + wrap(); + // Do escaping to allow framing by the terminal layer. + wrap(); + // Framing. + wrap(); + if(ber > 0) + // The server simulates a lossy channel. + wrap(ber); + // Verbose output. + if(verbose) + wrap(); + + // Connect to I/O. + m_syncLayer.wrap(*m_layers.back()); + + // Register the store... + m_synchronizer.map(store); + // ...and the protocol stack. + m_synchronizer.connect(**m_layers.begin()); + + // There we go! + if(!server) + m_synchronizer.syncFrom(store, m_syncLayer); + } + + stored::Pollable& pollable() + { + return m_pollable; + } + + void recv() + { + int res = m_syncLayer.recv(); + + switch(res) { + case 0: + case EAGAIN: + return; + default: + fprintf(stderr, "Sync recv failed with error %d; %s\n", res, + zmq_strerror(res)); + break; + } + } + +protected: + template + T& wrap(Args&&... args) + { + auto* p = new T{std::forward(args)...}; + std::unique_ptr layer{p}; + + if(!m_layers.empty()) + layer->wrap(*m_layers.back()); + + m_layers.emplace_back(std::move(layer)); + return *p; + } + +private: + stored::Synchronizer m_synchronizer; + stored::ArqLayer* m_arq = nullptr; + stored::IdleCheckLayer* m_idle = nullptr; + std::list> m_layers; + stored::SyncZmqLayer m_syncLayer; + stored::PollableZmqSocket m_pollable{m_syncLayer.socket(), stored::Pollable::PollIn}; +}; + + + +///////////////////////////////////////////////////////////////////////// +// Main function +// + +class disconnected : public std::exception {}; + +static void run(Arguments const& args, ExampleSync& store, DebugStack& debugStack) +{ + std::unique_ptr syncStack; + + if(args.client_port) { + char endpoint[32]{}; + int res = snprintf( + endpoint, sizeof(endpoint), "tcp://localhost:%d", args.client_port); + if(res < 0 || (size_t)res >= sizeof(endpoint)) + throw std::runtime_error{"Endpoint string too long"}; + + syncStack.reset(new SyncStack{store, endpoint, false, args.verbose}); + } else if(args.server_port) { + char endpoint[32]{}; + int res = snprintf( + endpoint, sizeof(endpoint), "tcp://localhost:%d", args.server_port); + if(res < 0 || (size_t)res >= sizeof(endpoint)) + throw std::runtime_error{"Endpoint string too long"}; + + syncStack.reset(new SyncStack{store, endpoint, true, args.verbose}); + } + + stored::Poller poller; + if((errno = poller.add(debugStack.pollable()))) { + perror("Cannot add pollable"); + throw std::runtime_error{"Poller add failed"}; + } + + if((errno = poller.add(syncStack->pollable()))) { + perror("Cannot add pollable"); + throw std::runtime_error{"Poller add failed"}; + } + + while(true) { + poller.poll(200); + debugStack.recv(); + syncStack->recv(); + } +} + +int main(int argc, char** argv) +{ + try { + Arguments args = parse_arguments(argc, argv); + + ExampleSync store; + DebugStack debugStack{store, args.debug_port}; + + while(true) { + try { + run(args, store, debugStack); + } catch(disconnected&) { + fprintf(stderr, "Disconnected, restarting...\n"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + ++store.restarted; + } + } + } catch(exit_now&) { + return 0; + } catch(std::invalid_argument&) { + return 1; + } catch(std::exception& e) { + fprintf(stderr, "Error: %s\n", e.what()); + return 2; + } catch(...) { + fprintf(stderr, "Unknown error\n"); + return 3; + } +} diff --git a/include/libstored/protocol.h b/include/libstored/protocol.h index ff336256..aeb8cc4b 100644 --- a/include/libstored/protocol.h +++ b/include/libstored/protocol.h @@ -1,6 +1,6 @@ #ifndef LIBSTORED_PROTOCOL_H #define LIBSTORED_PROTOCOL_H -// SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -968,6 +968,38 @@ class PrintLayer : public ProtocolLayer { bool m_enable; }; +/*! + * \brief A layer that deliberately injects bit errors into the message stream. + * + * Depending on the given bit error rate (BER), bits are flipped randomly in + * both the encoded and decoded messages. + * + * Mainly for debugging purposes. + */ +class LossyLayer : public ProtocolLayer { + STORED_CLASS_NOCOPY(LossyLayer) +public: + typedef ProtocolLayer base; + explicit LossyLayer( + float ber = 0, ProtocolLayer* up = nullptr, ProtocolLayer* down = nullptr); + + virtual ~LossyLayer() override is_default + + virtual void decode(void* buffer, size_t len) override; + virtual void encode(void const* buffer, size_t len, bool last = true) override; +# ifndef DOXYGEN + using base::encode; +# endif + + float ber() const; + void ber(float ber); + +private: + float m_ber; + int m_bitThreshold; + int m_byteThreshold; +}; + /*! * \brief A layer that tracks if it sees communication through the stack. * diff --git a/python/.gitignore b/python/.gitignore index 7b951fd0..ea34942a 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +# SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers # # SPDX-License-Identifier: CC0-1.0 @@ -7,3 +7,4 @@ /build/ /version.txt /libstored/data +/libstored-*/ diff --git a/src/protocol.cpp b/src/protocol.cpp index fa10bf6a..ee195a96 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -1723,6 +1723,50 @@ bool PrintLayer::enabled() const +////////////////////////////// +// LossyLayer +// + +LossyLayer::LossyLayer(float ber, ProtocolLayer* up, ProtocolLayer* down) + : base(up, down) + , m_ber() + , m_bitThreshold() + , m_byteThreshold() +{ + this->ber(ber); +} + +void LossyLayer::decode(void* buffer, size_t len) +{ + base::decode(buffer, len); +} + +void LossyLayer::encode(void const* buffer, size_t len, bool last) +{ + base::encode(buffer, len, last); +} + +float LossyLayer::ber() const +{ + return m_ber; +} + +void LossyLayer::ber(float ber) +{ + if(ber < 0.F || std::isnan(ber)) + ber = 0.F; + else if(ber > 1.F) + ber = 1.F; + + m_ber = ber; + m_bitThreshold = + (int)(std::min(RAND_MAX, (unsigned int)(ber * (float)RAND_MAX))); + m_byteThreshold = (int)(std::min( + RAND_MAX, (unsigned int)(std::pow(ber, 8.F) * (float)RAND_MAX))); +} + + + ////////////////////////////// // Loopback // diff --git a/src/synchronizer.cpp b/src/synchronizer.cpp index 57e66914..2a36738e 100644 --- a/src/synchronizer.cpp +++ b/src/synchronizer.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -1397,7 +1397,7 @@ StoreJournal* Synchronizer::toJournal(char const* hash) const * \brief Connect the given connection to this Synchronizer. * * A #stored::SyncConnection is instantiated on top of the given protocol stack. - * This SyncConnection is the OSI Application layer of the synchronization prococol. + * This SyncConnection is the OSI Application layer of the synchronization protocol. * * \return the #stored::SyncConnection, which is valid until #disconnect() is called */ From ef6afed8a5177a0c00738193c4cc69ccf2402ed5 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:13:39 +0100 Subject: [PATCH 04/14] fix naming --- include/libstored/poller.h | 7 ++--- include/libstored/zmq.h | 36 ++++++++++++++----------- sphinx/doc/cpp_protocol.rst | 19 ++++++++++--- src/zmq.cpp | 54 ++++++++++++++++++------------------- 4 files changed, 67 insertions(+), 49 deletions(-) diff --git a/include/libstored/poller.h b/include/libstored/poller.h index 1835251e..b4e7bb48 100644 --- a/include/libstored/poller.h +++ b/include/libstored/poller.h @@ -443,15 +443,16 @@ inline PollableZmqSocket pollable(void* s, Pollable::Events const& events, void* class PollableZmqLayer : public TypedPollable { STORED_POLLABLE_TYPE(PollableZmqLayer) public: - constexpr PollableZmqLayer(ZmqLayer& l, Events const& e, void* user = nullptr) noexcept + constexpr PollableZmqLayer(ZmqBaseLayer& l, Events const& e, void* user = nullptr) noexcept : TypedPollable(e, user) , layer(&l) {} - ZmqLayer* layer; + ZmqBaseLayer* layer; }; -inline PollableZmqLayer pollable(ZmqLayer& l, Pollable::Events const& events, void* user = nullptr) +inline PollableZmqLayer +pollable(ZmqBaseLayer& l, Pollable::Events const& events, void* user = nullptr) { return PollableZmqLayer(l, events, user); } diff --git a/include/libstored/zmq.h b/include/libstored/zmq.h index 8a4f2aba..894f9af4 100644 --- a/include/libstored/zmq.h +++ b/include/libstored/zmq.h @@ -1,6 +1,6 @@ #ifndef LIBSTORED_ZMQ_H #define LIBSTORED_ZMQ_H -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -19,21 +19,22 @@ namespace stored { /*! - * \brief A protocol layer that wraps the protocol stack and implements ZeroMQ socket around it. + * \brief A protocol layer that wraps the protocol stack and implements ZeroMQ + * socket around it. * * This is a generic ZeroMQ class, for practical usages, instantiate * #stored::DebugZmqLayer or #stored::SyncZmqLayer instead. */ -class ZmqLayer : public PolledSocketLayer { - STORED_CLASS_NOCOPY(ZmqLayer) +class ZmqBaseLayer : public PolledSocketLayer { + STORED_CLASS_NOCOPY(ZmqBaseLayer) public: typedef PolledSocketLayer base; using base::fd_type; - ZmqLayer( + ZmqBaseLayer( void* context, int type, ProtocolLayer* up = nullptr, ProtocolLayer* down = nullptr); - virtual ~ZmqLayer() override; + virtual ~ZmqBaseLayer() override; void* context() const; void* socket() const; @@ -70,10 +71,10 @@ class ZmqLayer : public PolledSocketLayer { * \brief Constructs a protocol stack on top of a REQ/REP ZeroMQ socket, specifically for the * #stored::Debugger. */ -class DebugZmqLayer : public ZmqLayer { +class DebugZmqLayer : public ZmqBaseLayer { STORED_CLASS_NOCOPY(DebugZmqLayer) public: - typedef ZmqLayer base; + typedef ZmqBaseLayer base; enum STORED_ANONYMOUS { DefaultPort = 19026, @@ -89,21 +90,26 @@ class DebugZmqLayer : public ZmqLayer { }; /*! - * \brief Constructs a protocol stack on top of a PAIR ZeroMQ socket, specifically for the - * #stored::Synchronizer. + * \brief Generic ZeroMQ DEALER socket, for raw byte I/O via sockets. */ -class SyncZmqLayer : public ZmqLayer { - STORED_CLASS_NOCOPY(SyncZmqLayer) +class ZmqLayer : public ZmqBaseLayer { + STORED_CLASS_NOCOPY(ZmqLayer) public: - typedef ZmqLayer base; + typedef ZmqBaseLayer base; - SyncZmqLayer( + ZmqLayer( void* context, char const* endpoint, bool listen, ProtocolLayer* up = nullptr, ProtocolLayer* down = nullptr); /*! \brief Dtor. */ - virtual ~SyncZmqLayer() override is_default + virtual ~ZmqLayer() override is_default }; +/*! + * \brief Constructs a protocol stack on top of a DEALER ZeroMQ socket, + * specifically for the #stored::Synchronizer. + */ +typedef ZmqLayer SyncZmqLayer; + } // namespace stored # endif // STORED_HAVE_ZMQ diff --git a/sphinx/doc/cpp_protocol.rst b/sphinx/doc/cpp_protocol.rst index 67eb1934..bff61434 100644 --- a/sphinx/doc/cpp_protocol.rst +++ b/sphinx/doc/cpp_protocol.rst @@ -123,10 +123,11 @@ The inheritance of the layers is shown below. ProtocolLayer <|-- Debugger ProtocolLayer <|-- SyncConnection - abstract ZmqLayer - PolledSocketLayer <|-- ZmqLayer - ZmqLayer <|-- DebugZmqLayer - ZmqLayer <|-- SyncZmqLayer + abstract ZmqBaseLayer + PolledSocketLayer <|-- ZmqBaseLayer + ZmqBaseLayer <|-- DebugZmqLayer + ZmqBaseLayer <|-- SyncZmqLayer + ZmqBaseLayer <|-- ZmqLayer class Loopback FifoLoopback --> FifoLoopback1 @@ -247,6 +248,16 @@ stored::XsimLayer .. doxygenclass:: stored::XsimLayer +stored::ZmqBaseLayer +-------------------- + +.. doxygenclass:: stored::ZmqBaseLayer + +stored::ZmqLayer +-------------------- + +.. doxygenclass:: stored::ZmqLayer + Abstract classes diff --git a/src/zmq.cpp b/src/zmq.cpp index 1de8a22b..72458171 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -6,27 +6,27 @@ #include #ifdef STORED_HAVE_ZMQ -# include +# include -# ifdef STORED_HAVE_ZTH +# ifdef STORED_HAVE_ZTH // Allow Zth to provide wrappers for blocking ZMQ calls. -# include -# endif +# include +# endif -# include -# include +# include +# include -# if STORED_cplusplus < 201103L -# include -# else -# include -# endif +# if STORED_cplusplus < 201103L +# include +# else +# include +# endif namespace stored { ////////////////////////////// -// ZmqLayer +// ZmqBaseLayer // /*! @@ -34,7 +34,7 @@ namespace stored { * \param context the ZeroMQ context to use. If \c nullptr, a new context is allocated. * \param type the ZeroMQ socket type to create. */ -ZmqLayer::ZmqLayer(void* context, int type, ProtocolLayer* up, ProtocolLayer* down) +ZmqBaseLayer::ZmqBaseLayer(void* context, int type, ProtocolLayer* up, ProtocolLayer* down) : base(up, down) , m_context(context ? context : zmq_ctx_new()) , m_contextCleanup(!context) @@ -53,7 +53,7 @@ ZmqLayer::ZmqLayer(void* context, int type, ProtocolLayer* up, ProtocolLayer* do * The sockets are closed (which may block). * If a ZeroMQ context was allocated, it is terminated here. */ -ZmqLayer::~ZmqLayer() +ZmqBaseLayer::~ZmqBaseLayer() { // NOLINTNEXTLINE(cppcoreguidelines-owning-memory,cppcoreguidelines-no-malloc) free(m_buffer); @@ -68,7 +68,7 @@ ZmqLayer::~ZmqLayer() /*! * \brief The ZeroMQ context. */ -void* ZmqLayer::context() const +void* ZmqBaseLayer::context() const { return m_context; } @@ -78,7 +78,7 @@ void* ZmqLayer::context() const * * Do not use this function to manipulate the socket, only for calls like \c zmq_poll(). */ -void* ZmqLayer::socket() const +void* ZmqBaseLayer::socket() const { return m_socket; } @@ -88,23 +88,23 @@ void* ZmqLayer::socket() const * * Use this socket to determine if recv() would block. */ -ZmqLayer::fd_type ZmqLayer::fd() const +ZmqBaseLayer::fd_type ZmqBaseLayer::fd() const { fd_type socket; // NOLINT(cppcoreguidelines-init-variables) size_t size = sizeof(socket); if(zmq_getsockopt(m_socket, ZMQ_FD, &socket, &size) == -1) { -# ifdef STORED_OS_WINDOWS +# ifdef STORED_OS_WINDOWS return INVALID_SOCKET; // NOLINT(hicpp-signed-bitwise) -# else +# else return -1; -# endif +# endif } return socket; } -int ZmqLayer::block(fd_type fd, bool forReading, long timeout_us, bool suspend) +int ZmqBaseLayer::block(fd_type fd, bool forReading, long timeout_us, bool suspend) { STORED_UNUSED(fd) // Just use our socket. @@ -114,7 +114,7 @@ int ZmqLayer::block(fd_type fd, bool forReading, long timeout_us, bool suspend) /*! * \brief Like #block(fd_type,bool,long,bool), but using the #socket() by default. */ -int ZmqLayer::block(bool forReading, long timeout_us, bool suspend) +int ZmqBaseLayer::block(bool forReading, long timeout_us, bool suspend) { STORED_UNUSED(suspend) @@ -168,7 +168,7 @@ int ZmqLayer::block(bool forReading, long timeout_us, bool suspend) * \brief Try to receive a message from the ZeroMQ REP socket, and decode() it. * \param timeout_us if zero, this function does not block. -1 blocks indefinitely. */ -int ZmqLayer::recv1(long timeout_us) +int ZmqBaseLayer::recv1(long timeout_us) { int res = 0; int more = 0; @@ -250,7 +250,7 @@ int ZmqLayer::recv1(long timeout_us) * \brief Try to receive all available data from the ZeroMQ REP socket, and decode() it. * \param timeout_us if zero, this function does not block. -1 blocks indefinitely. */ -int ZmqLayer::recv(long timeout_us) +int ZmqBaseLayer::recv(long timeout_us) { bool first = true; @@ -280,7 +280,7 @@ int ZmqLayer::recv(long timeout_us) * \copydoc stored::ProtocolLayer::encode(void const*, size_t, bool) * \details Encoded data is send as REP over the ZeroMQ socket. */ -void ZmqLayer::encode(void const* buffer, size_t len, bool last) +void ZmqBaseLayer::encode(void const* buffer, size_t len, bool last) { // First try, assume we are writable. // NOLINTNEXTLINE(hicpp-signed-bitwise,cppcoreguidelines-pro-type-cstyle-cast) @@ -362,7 +362,7 @@ int DebugZmqLayer::recv(long timeout_us) * * \see #stored::Synchronizer */ -SyncZmqLayer::SyncZmqLayer( +ZmqLayer::ZmqLayer( void* context, char const* endpoint, bool listen, ProtocolLayer* up, ProtocolLayer* down) : base(context, ZMQ_DEALER, up, down) { From 9109f2396100684da634690db59e3e0d7488e912 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:14:13 +0100 Subject: [PATCH 05/14] add arith ops --- CHANGELOG.rst | 2 +- include/libstored/types.h | 117 +++++++++++++++++++++++++++----------- 2 files changed, 84 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7da356db..9a630d30 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -24,7 +24,7 @@ The format is based on `Keep a Changelog`_, and this project adheres to Added ````` -... +- Operations such as ``-=`` and ``++`` for store variables in C++. .. _Unreleased: https://github.com/DEMCON/libstored/compare/v2.0.0...HEAD diff --git a/include/libstored/types.h b/include/libstored/types.h index ce44e77b..47e789d6 100644 --- a/include/libstored/types.h +++ b/include/libstored/types.h @@ -18,6 +18,7 @@ # if STORED_cplusplus >= 201103L # include +# include # else # include # endif @@ -350,21 +351,79 @@ struct fromType { (unsigned int)T&(unsigned int)~Type::FlagFunction)>::type type; }; -# define STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, op) \ - Class& operator op##=(type v) noexcept \ - { \ - stored_assert(Type::isInt(toType::type) || Type::isFloat(toType::type)); \ - set(get() op v); \ - return *this; \ - } - -# define STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, op) \ - Class& operator op##=(type v) noexcept \ - { \ - stored_assert(Type::isInt(toType::type)); \ - set(get() op v); \ - return *this; \ - } +# if STORED_cplusplus >= 201103L +# define STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, op) \ + template < \ + typename U = T, \ + typename std::enable_if< \ + Type::isInt(toType::type) || Type::isFloat(toType::type), int>::type = \ + 0> \ + Class& operator op##=(U v) noexcept \ + { \ + set(get() op v); \ + return *this; \ + } + +# define STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, op) \ + template < \ + typename U = T, \ + typename std::enable_if::type), int>::type = 0> \ + Class& operator op##=(U v) noexcept \ + { \ + set(get() op v); \ + return *this; \ + } + +# define STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, op) \ + template < \ + typename U = T, \ + typename std::enable_if::type), int>::type = 0> \ + Class& operator op##op() noexcept \ + { \ + return *this op## = 1; \ + } \ + \ + template < \ + typename U = T, \ + typename std::enable_if::type), int>::type = 0> \ + U const operator op##op(int) noexcept \ + { \ + U x = get(); \ + set(x op 1); \ + return x; \ + } +# else // < C++11 +# define STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, op) \ + Class& operator op##=(T v) noexcept \ + { \ + stored_assert(Type::isInt(toType::type) || Type::isFloat(toType::type)); \ + set(get() op v); \ + return *this; \ + } + +# define STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, op) \ + Class& operator op##=(T v) noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + set(get() op v); \ + return *this; \ + } + +# define STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, op) \ + Class& operator op##op() noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + return *this op## = 1; \ + } \ + \ + T const operator op##op(int) noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + T x = get(); \ + set(x op 1); \ + return x; \ + } +# endif // < C++11 # define STORED_VARIABLE_MEMBER_ARITH_OPS(Class, T) \ STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, +) \ @@ -377,19 +436,8 @@ struct fromType { STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, ^) \ STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, <<) \ STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, >>) \ - \ - Class& operator++() noexcept \ - { \ - stored_assert(Type::isInt(toType::type)); \ - return *this += 1; \ - } \ - \ - Class& operator--() noexcept \ - { \ - stored_assert(Type::isInt(toType::type)); \ - return *this -= 1; \ - } - + STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, +) \ + STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, -) template class Variant; @@ -570,7 +618,7 @@ class Variable { return sizeof(type); } - STORED_VARIABLE_MEMBER_ARITH_OPS(Variable, T) + STORED_VARIABLE_MEMBER_ARITH_OPS(Variable, type) protected: /*! @@ -781,7 +829,7 @@ class Variable : public Variable { */ void entryX() const noexcept { - container().hookEntryX(toType::type, &this->buffer(), sizeof(type)); + container().hookEntryX(toType::type, &this->buffer(), sizeof(type)); # ifdef _DEBUG stored_assert(m_entry == EntryNone); m_entry = EntryX; @@ -798,7 +846,7 @@ class Variable : public Variable { stored_assert(m_entry == EntryX); m_entry = EntryNone; # endif - container().hookExitX(toType::type, &this->buffer(), sizeof(type), changed); + container().hookExitX(toType::type, &this->buffer(), sizeof(type), changed); } /*! @@ -807,7 +855,7 @@ class Variable : public Variable { */ void entryRO() const noexcept { - container().hookEntryRO(toType::type, &this->buffer(), sizeof(type)); + container().hookEntryRO(toType::type, &this->buffer(), sizeof(type)); # ifdef _DEBUG stored_assert(m_entry == EntryNone); m_entry = EntryRO; @@ -824,10 +872,10 @@ class Variable : public Variable { stored_assert(m_entry == EntryRO); m_entry = EntryNone; # endif - container().hookExitRO(toType::type, &this->buffer(), sizeof(type)); + container().hookExitRO(toType::type, &this->buffer(), sizeof(type)); } - STORED_VARIABLE_MEMBER_ARITH_OPS(Variable, T) + STORED_VARIABLE_MEMBER_ARITH_OPS(Variable, type) private: /*! \brief The container of this Variable. */ @@ -2459,6 +2507,7 @@ class StoreVariantF { } // namespace impl # undef STORED_VARIABLE_MEMBER_ARITH_OPS +# undef STORED_VARIABLE_MEMBER_ARITH_OPOP # undef STORED_VARIABLE_MEMBER_ARITH_BITOP # undef STORED_VARIABLE_MEMBER_ARITH_OP From fd402ab95740ed48922aa26eb8b757df5ed956dd Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:16:10 +0100 Subject: [PATCH 06/14] fix lossy layer and reduce arq retransmits --- CHANGELOG.rst | 2 + include/libstored/protocol.h | 8 ++++ src/protocol.cpp | 91 ++++++++++++++++++++++++++++++++++-- version/version.txt | 2 +- 4 files changed, 98 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9a630d30..5cf44645 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -25,6 +25,8 @@ Added ````` - Operations such as ``-=`` and ``++`` for store variables in C++. +- Don't auto-retransmit on encode by the ``stored::ArqLayer``. Only retransmit + on ``keepAlive()``. .. _Unreleased: https://github.com/DEMCON/libstored/compare/v2.0.0...HEAD diff --git a/include/libstored/protocol.h b/include/libstored/protocol.h index aeb8cc4b..7e2ab4b4 100644 --- a/include/libstored/protocol.h +++ b/include/libstored/protocol.h @@ -993,11 +993,19 @@ class LossyLayer : public ProtocolLayer { float ber() const; void ber(float ber); + size_t errors() const; + +protected: + bool error(int threshold); private: float m_ber; int m_bitThreshold; int m_byteThreshold; + size_t m_errors; +# ifdef STORED_OS_POSIX + unsigned int m_seed; +# endif // STORED_OS_POSIX }; /*! diff --git a/src/protocol.cpp b/src/protocol.cpp index ee195a96..15457321 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -662,6 +662,8 @@ bool ArqLayer::waitingForAck() const void ArqLayer::encode(void const* buffer, size_t len, bool last) { + bool isIdle = !waitingForAck(); + if(m_maxEncodeBuffer > 0 && m_maxEncodeBuffer < m_encodeQueueSize + len + 1U /* seq */) event(EventEncodeBufferOverflow); @@ -685,7 +687,8 @@ void ArqLayer::encode(void const* buffer, size_t len, bool last) break; } - transmit(); + if(isIdle) + transmit(); } bool ArqLayer::flush() @@ -1727,30 +1730,110 @@ bool PrintLayer::enabled() const // LossyLayer // +/*! + * \brief Ctor. + * + * The \c ber (bit error rate, between 0.0 and 1.0) is the probability that a + * bit is flipped for every bit passing this layer. + */ LossyLayer::LossyLayer(float ber, ProtocolLayer* up, ProtocolLayer* down) : base(up, down) , m_ber() , m_bitThreshold() , m_byteThreshold() + , m_errors() +#ifdef STORED_OS_POSIX + // NOLINTNEXTLINE + , m_seed((unsigned int)(uintptr_t)this) +#endif // STORED_OS_POSIX { this->ber(ber); } +/*! + * \brief Check if an error should be injected, given a threshold. + * + * The threshold is a value between 0 and \c RAND_MAX. The higher the threshold, + * the more likely an error is injected. + */ +bool LossyLayer::error(int threshold) +{ +#ifdef STORED_OS_POSIX + bool res = rand_r(&m_seed) < threshold; +#else // !STORED_OS_POSIX + bool res = rand() < threshold; +#endif // !STORED_OS_POSIX + if(res) + m_errors++; + return res; +} + +/*! + * \brief Number of injected errors. + */ +size_t LossyLayer::errors() const +{ + return m_errors; +} + void LossyLayer::decode(void* buffer, size_t len) { - base::decode(buffer, len); + uint8_t* buffer_ = static_cast(buffer); + + for(size_t i = 0; i < len; i++) { + if(!error(m_byteThreshold)) + continue; + + // Inject error. + buffer_[i] = (uint8_t)(buffer_[i] ^ 0x01U); + for(unsigned int b = 1U; b < 8U; b++) + if(error(m_bitThreshold)) + buffer_[i] = (uint8_t)(buffer_[i] ^ (0x01U << b)); + } + + base::decode(buffer_, len); } void LossyLayer::encode(void const* buffer, size_t len, bool last) { - base::encode(buffer, len, last); + uint8_t const* buffer_ = static_cast(buffer); + + size_t i = 0; + while(i < len) { + // First part without errors. + size_t c = i; + for(; c < len && !error(m_byteThreshold); c++) + ; + if(c > i) + base::encode(buffer_ + i, c - i, i + c >= len && last); + + i += c; + if(i >= len) + break; + + // Inject an error byte. + uint8_t e = (uint8_t)(buffer_[i] ^ 0x01U); + for(unsigned int b = 1U; b < 8U; b++) + if(error(m_bitThreshold)) + e = (uint8_t)(e ^ (0x01U << b)); + + i++; + base::encode(&e, 1U, i >= len && last); + } } +/*! + * \brief The configured bit error rate (BER). + */ float LossyLayer::ber() const { return m_ber; } +/*! + * \brief Set a new bit error rate. + * \param ber the new bit error rate, between 0.0 and 1.0 + */ void LossyLayer::ber(float ber) { if(ber < 0.F || std::isnan(ber)) @@ -1762,7 +1845,7 @@ void LossyLayer::ber(float ber) m_bitThreshold = (int)(std::min(RAND_MAX, (unsigned int)(ber * (float)RAND_MAX))); m_byteThreshold = (int)(std::min( - RAND_MAX, (unsigned int)(std::pow(ber, 8.F) * (float)RAND_MAX))); + RAND_MAX, (unsigned int)((1.F - std::pow((1.F - ber), 8.F)) * (float)RAND_MAX))); } diff --git a/version/version.txt b/version/version.txt index 2e21232a..ccd4e8c1 100644 --- a/version/version.txt +++ b/version/version.txt @@ -1 +1 @@ -2.0.1-alpha +2.1.0-alpha From 21c2a070d29a03075e98bbd862c916f721bee137 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:16:19 +0100 Subject: [PATCH 07/14] improve example --- .vscode/c_cpp_properties.json | 5 +- .vscode/settings.json | 4 +- examples/lossy_sync/ExampleSync.st | 6 + examples/lossy_sync/main.cpp | 415 ++++++++++++++++++++++------- 4 files changed, 329 insertions(+), 101 deletions(-) diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index 0eaf9dc7..4e3ae6aa 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -3,6 +3,7 @@ { "name": "Linux", "includePath": [ + "${workspaceFolder}/include", "${workspaceFolder}/**" ], "defines": [ @@ -16,6 +17,7 @@ { "name": "Win32", "includePath": [ + "${workspaceFolder}/include", "${workspaceFolder}/**" ], "defines": [ @@ -27,6 +29,7 @@ { "name": "Mac", "includePath": [ + "${workspaceFolder}/include", "${workspaceFolder}/**" ], "defines": [ @@ -39,4 +42,4 @@ } ], "version": 4 -} \ No newline at end of file +} diff --git a/.vscode/settings.json b/.vscode/settings.json index 92a44910..7b71903c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -23,7 +23,9 @@ "C_Cpp.doxygen.generatedStyle": "/*!", "files.exclude": { "**/.git": true, - "**/dist/venv/**": true + "**/dist/venv/**": true, + "python/libstored/data/**": true, + "python/libstored-*/**": true }, "clang-format.executable": "clang-format-15", "cmakeFormat.env": { diff --git a/examples/lossy_sync/ExampleSync.st b/examples/lossy_sync/ExampleSync.st index 42224ef4..ff76e959 100644 --- a/examples/lossy_sync/ExampleSync.st +++ b/examples/lossy_sync/ExampleSync.st @@ -2,6 +2,12 @@ // // SPDX-License-Identifier: CC0-1.0 +uint32 client heartbeat +uint32 server heartbeat + int32 restarted int32 i double d + +(float) ber +(uint32) errors diff --git a/examples/lossy_sync/main.cpp b/examples/lossy_sync/main.cpp index f5ad5633..855d3bde 100644 --- a/examples/lossy_sync/main.cpp +++ b/examples/lossy_sync/main.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2020-20255555 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: CC0-1.0 @@ -11,6 +11,7 @@ #include "ExampleSync.h" #include +#include #include #include #include @@ -19,6 +20,42 @@ #include +enum { + PollInterval_ms = 100, + SyncInterval_ms = PollInterval_ms * 5, + IdleTimeout_ms = SyncInterval_ms, + DisconnectTimeout_ms = IdleTimeout_ms * 10, + HeartbeatInterval_ms = 1000, + ReconnectDelay_ms = DisconnectTimeout_ms + IdleTimeout_ms * 2, +}; + + +///////////////////////////////////////////////////////////////////////// +// Logging +// + +static std::function logger_callback; + +__attribute__((format(printf, 1, 0))) static void logv(char const* format, va_list args) +{ + static char msg[1024]; + vsnprintf(msg, sizeof(msg), format, args); + fputs(msg, stderr); + + if(logger_callback) + logger_callback(msg); +} + +__attribute__((format(printf, 1, 2))) static void log(char const* format, ...) +{ + va_list args; + va_start(args, format); + logv(format, args); + va_end(args); +} + + + ///////////////////////////////////////////////////////////////////////// // The store // @@ -27,6 +64,37 @@ class ExampleSync : public STORE_T(ExampleSync, stored::Synchronizable, stored:: STORE_CLASS(ExampleSync, stored::Synchronizable, stored::ExampleSyncBase) public: ExampleSync() is_default + + void __ber(bool set, float& value) noexcept + { + auto l = m_lossyLayer.lock(); + if(set) + l->ber(value); + else + value = l ? l->ber() : std::numeric_limits::quiet_NaN(); + } + + void __errors(bool set, uint32_t& value) noexcept + { + if(set) + return; + + auto l = m_lossyLayer.lock(); + value = l ? static_cast(l->errors()) : 0U; + } + + void setLossyLayer(std::shared_ptr&& layer) noexcept + { + m_lossyLayer = std::move(layer); + } + + void setLossyLayer(std::shared_ptr const& layer) noexcept + { + m_lossyLayer = layer; + } + +private: + std::weak_ptr m_lossyLayer; }; @@ -35,32 +103,27 @@ class ExampleSync : public STORE_T(ExampleSync, stored::Synchronizable, stored:: // Argument parsing and help // -static int parse_port(char const* str) -{ - char* endptr = nullptr; - long port = strtol(str, &endptr, 0); - if(*endptr || port <= 0 || port >= 0x10000) - throw std::invalid_argument{"Invalid port"}; - return (int)port; -} - static void print_help(FILE* out, char const* progname) { - fprintf(out, "Usage: %s [-h] [-v] [-p ] {-s |-c }\n", progname); + fprintf(out, "Usage: %s [-h] [-v] [-p ] {-s |-c } [-b ]\n", + progname); fprintf(out, "where\n"); fprintf(out, " -h Show this help message.\n"); - fprintf(out, " -s Server 0MQ endpoint for downstream sync.\n"); - fprintf(out, " -c Client 0MQ endpoint for upstream sync.\n"); + fprintf(out, " -s Server 0MQ endpoint for downstream sync, such as: tcp://*:5555\n"); + fprintf(out, + " -c Client 0MQ endpoint for upstream sync, such as: tcp://localhost:5555\n"); fprintf(out, " -p Set debugger's port. Default: %d\n", stored::DebugZmqLayer::DefaultPort); fprintf(out, " -v Verbose output of sync connections.\n"); + fprintf(out, " -b Bit error rate (BER) for lossy channel. Default: 0\n"); } struct Arguments { bool verbose = false; int debug_port = stored::DebugZmqLayer::DefaultPort; - int client_port = 0; - int server_port = 0; + std::string client; + std::string server; + float ber = 0; }; class exit_now : public std::exception {}; @@ -71,14 +134,16 @@ static Arguments parse_arguments(int argc, char** argv) int c; // flawfinder: ignore - while((c = getopt(argc, argv, "hs:c:p:v")) != -1) { + while((c = getopt(argc, argv, "hs:c:p:vb:")) != -1) { switch(c) { case 'p': try { - args.debug_port = parse_port(optarg); - } catch(std::invalid_argument&) { - fprintf(stderr, "Invalid debug port '%s'\n", optarg); - throw; + int port = std::stoi(optarg); + if(port <= 0 || port >= 0x10000) + throw std::invalid_argument{"Invalid port"}; + args.debug_port = port; + } catch(std::exception& e) { + throw std::invalid_argument{e.what()}; } break; case 'v': @@ -86,19 +151,20 @@ static Arguments parse_arguments(int argc, char** argv) printf("Enable verbose output\n"); break; case 's': - try { - args.server_port = parse_port(optarg); - } catch(std::invalid_argument&) { - fprintf(stderr, "Invalid server port '%s'\n", optarg); - throw; - } + args.server = optarg; break; case 'c': + args.client = optarg; + break; + case 'b': try { - args.client_port = parse_port(optarg); + args.ber = std::stof(optarg); + if(args.ber < 0.F || args.ber > 1.F) + throw std::invalid_argument{"Invalid BER"}; } catch(std::invalid_argument&) { - fprintf(stderr, "Invalid client port '%s'\n", optarg); throw; + } catch(std::exception& e) { + throw std::invalid_argument{e.what()}; } break; case 'h': @@ -110,13 +176,13 @@ static Arguments parse_arguments(int argc, char** argv) } } - if(args.client_port && args.server_port) { - fprintf(stderr, "Cannot be both client and server\n"); + if(!args.client.empty() && !args.server.empty()) { + log("Cannot be both client and server\n"); throw std::invalid_argument{""}; } - if(!args.client_port && !args.server_port) { - fprintf(stderr, "Must be either client or server\n"); + if(args.client.empty() && args.server.empty()) { + log("Must be either client or server\n"); throw std::invalid_argument{""}; } @@ -129,24 +195,36 @@ static Arguments parse_arguments(int argc, char** argv) // The stacks // +class disconnected : public std::exception {}; + /*! * \brief ZeroMQ interface for the debugger. */ class DebugStack { STORED_CLASS_NOCOPY(DebugStack) public: - explicit DebugStack(ExampleSync& store, int port) - : m_debugger{"lossy_sync"} - , m_debugLayer{nullptr, port} + explicit DebugStack(ExampleSync& store, int port, char const* name = nullptr) + : m_debugLayer{nullptr, port} { if((errno = m_debugLayer.lastError())) { - fprintf(stderr, "Cannot initialize ZMQ for debugging, got error %d; %s\n", - errno, zmq_strerror(errno)); + log("Cannot initialize ZMQ for debugging, got error %d; %s\n", errno, + zmq_strerror(errno)); throw std::runtime_error{"ZMQ initialization failed"}; } + m_id = "lossy_sync"; + if(name) + m_id += std::string{" ("} + name + ")"; + m_debugger.setIdentification(m_id.c_str()); + m_debugger.map(store); m_debugLayer.wrap(m_debugger); + logger_callback = [&](char const* msg) { m_debugger.stream('l', msg); }; + } + + ~DebugStack() noexcept + { + logger_callback = nullptr; } stored::Pollable& pollable() @@ -154,7 +232,7 @@ class DebugStack { return m_pollable; } - void recv() + void process() { int res = m_debugLayer.recv(); @@ -163,13 +241,13 @@ class DebugStack { case EAGAIN: return; default: - fprintf(stderr, "Debugger recv failed with error %d; %s\n", res, - zmq_strerror(res)); + log("Debugger recv failed with error %d; %s\n", res, zmq_strerror(res)); break; } } private: + std::string m_id; stored::Debugger m_debugger; stored::DebugZmqLayer m_debugLayer; stored::PollableZmqSocket m_pollable{m_debugLayer.socket(), stored::Pollable::PollIn}; @@ -183,21 +261,40 @@ class SyncStack { public: explicit SyncStack( ExampleSync& store, char const* endpoint, bool server, bool verbose, float ber = 0) - : m_syncLayer(nullptr, endpoint, server) + : m_zmqLayer(nullptr, endpoint, server) { - if((errno = m_syncLayer.lastError())) { - fprintf(stderr, "Cannot initialize ZMQ for sync, got error %d; %s\n", errno, - zmq_strerror(errno)); + if((errno = m_zmqLayer.lastError())) { + log("Cannot initialize ZMQ for sync, got error %d; %s\n", errno, + zmq_strerror(errno)); throw std::runtime_error{"ZMQ initialization failed"}; } + int linger = 0; + if(zmq_setsockopt(m_zmqLayer.socket(), ZMQ_LINGER, &linger, sizeof(linger)) == -1) { + log("Cannot set ZMQ_LINGER, got error %d; %s\n", errno, + zmq_strerror(errno)); + throw std::runtime_error{"ZMQ setsockopt failed"}; + } + + if(verbose) + wrap(stdout, "sync"); + // We don't want to do ARQ on large messages, so we segment them to some // appropriate size. wrap(32U); - // Perform retransmits. - m_arq = &wrap(); + // Perform retransmits. Limit the encode queue to 10 KiB. + m_arq = wrap(10240U); + m_arq->setEventCallback( + [](stored::ArqLayer&, stored::ArqLayer::Event event, void* arg) { + static_cast(arg)->event(event); + }, + this); // Check if we have communication at all. - m_idle = &wrap(); + m_idle = wrap(); + + if(verbose) + wrap(stdout, "arq"); + // Do CRC checks. Do this below the ARQ, such that the ARQ sees no or // correct messages. wrap(); @@ -205,15 +302,19 @@ class SyncStack { wrap(); // Framing. wrap(); - if(ber > 0) + if(server) // The server simulates a lossy channel. - wrap(ber); - // Verbose output. + store.setLossyLayer(wrap(ber)); + + // Optional: buffer partial messages to reduce the number of sends/receives on the + // wire. + wrap(); + if(verbose) - wrap(); + wrap(stdout, "raw"); // Connect to I/O. - m_syncLayer.wrap(*m_layers.back()); + m_zmqLayer.wrap(*m_layers.back()); // Register the store... m_synchronizer.map(store); @@ -221,8 +322,19 @@ class SyncStack { m_synchronizer.connect(**m_layers.begin()); // There we go! - if(!server) - m_synchronizer.syncFrom(store, m_syncLayer); + auto now = std::chrono::steady_clock::now(); + m_idleUpSince = now; + m_idleDownSince = now; + m_lastSync = now; + m_lastHeartbeat = now; + m_heartbeat = server ? store.server_heartbeat.variable() + : store.client_heartbeat.variable(); + + if(!server) { + m_synchronizer.syncFrom(store, *m_layers.front()); + m_connected = true; + m_arq->keepAlive(); + } } stored::Pollable& pollable() @@ -230,42 +342,136 @@ class SyncStack { return m_pollable; } + void process() + { + auto now = std::chrono::steady_clock::now(); + + recv(); + doSync(now); + checkRetransmit(now); + checkDisconnect(now); + doHeartbeat(now); + + m_idle->setIdle(); + } + + bool connected() const + { + return m_connected; + } + +protected: + template + std::shared_ptr wrap(Args&&... args) + { + auto* p = new T{std::forward(args)...}; + std::shared_ptr layer{p}; + + if(!m_layers.empty()) + layer->wrap(*m_layers.back()); + + m_layers.emplace_back(layer); + return layer; + } + void recv() { - int res = m_syncLayer.recv(); + // Process incoming messages. + int res = m_zmqLayer.recv(); switch(res) { case 0: case EAGAIN: return; default: - fprintf(stderr, "Sync recv failed with error %d; %s\n", res, - zmq_strerror(res)); + log("Sync recv failed with error %d; %s\n", res, zmq_strerror(res)); break; } } -protected: - template - T& wrap(Args&&... args) + void doSync(std::chrono::time_point const& now) { - auto* p = new T{std::forward(args)...}; - std::unique_ptr layer{p}; + if(now - m_lastSync >= std::chrono::milliseconds(SyncInterval_ms)) { + m_lastSync = now; + m_synchronizer.process(); + } + } - if(!m_layers.empty()) - layer->wrap(*m_layers.back()); + void checkRetransmit(std::chrono::time_point const& now) + { + if(!connected()) + return; - m_layers.emplace_back(std::move(layer)); - return *p; + if(m_idle->idleDown()) { + auto dt = now - m_idleDownSince; + if(dt > std::chrono::milliseconds(IdleTimeout_ms)) { + m_arq->keepAlive(); + m_idleDownSince = now; + } + } else { + m_idleDownSince = now; + } + } + + void checkDisconnect(std::chrono::time_point const& now) + { + if(connected()) { + if(m_idle->idleUp()) { + auto dt = now - m_idleUpSince; + if(dt > std::chrono::milliseconds(DisconnectTimeout_ms)) { + log("No upstream activity, disconnecting\n"); + throw disconnected{}; + } + } else { + m_idleUpSince = now; + } + } else if(!m_idle->idleUp()) { + log("Upstream activity detected, connected\n"); + m_connected = true; + m_idleUpSince = now; + } + } + + void doHeartbeat(std::chrono::time_point const& now) + { + auto dt = now - m_lastHeartbeat; + if(dt >= std::chrono::milliseconds(HeartbeatInterval_ms)) { + m_lastHeartbeat = now; + m_heartbeat++; + } + } + + void event(stored::ArqLayer::Event event) + { + switch(event) { + case stored::ArqLayer::EventEncodeBufferOverflow: + log("ARQ encode buffer overflow\n"); + throw disconnected{}; + case stored::ArqLayer::EventReconnect: + log("ARQ reconnect event\n"); + // We need to reinitialize the synchronizer state. + throw disconnected{}; + case stored::ArqLayer::EventRetransmit: + log("ARQ retransmit limit exceeded, ignored\n"); + break; + default: + break; + } } private: stored::Synchronizer m_synchronizer; - stored::ArqLayer* m_arq = nullptr; - stored::IdleCheckLayer* m_idle = nullptr; - std::list> m_layers; - stored::SyncZmqLayer m_syncLayer; - stored::PollableZmqSocket m_pollable{m_syncLayer.socket(), stored::Pollable::PollIn}; + std::shared_ptr m_arq; + std::shared_ptr m_idle; + std::list> m_layers; + stored::ZmqLayer m_zmqLayer; + stored::PollableZmqSocket m_pollable{m_zmqLayer.socket(), stored::Pollable::PollIn}; + std::chrono::time_point m_idleUpSince; + std::chrono::time_point m_idleDownSince; + std::chrono::time_point m_lastSync; + std::chrono::time_point m_lastHeartbeat; + stored::Variable m_heartbeat; + bool m_connected = false; }; @@ -274,28 +480,14 @@ class SyncStack { // Main function // -class disconnected : public std::exception {}; - static void run(Arguments const& args, ExampleSync& store, DebugStack& debugStack) { std::unique_ptr syncStack; - if(args.client_port) { - char endpoint[32]{}; - int res = snprintf( - endpoint, sizeof(endpoint), "tcp://localhost:%d", args.client_port); - if(res < 0 || (size_t)res >= sizeof(endpoint)) - throw std::runtime_error{"Endpoint string too long"}; - - syncStack.reset(new SyncStack{store, endpoint, false, args.verbose}); - } else if(args.server_port) { - char endpoint[32]{}; - int res = snprintf( - endpoint, sizeof(endpoint), "tcp://localhost:%d", args.server_port); - if(res < 0 || (size_t)res >= sizeof(endpoint)) - throw std::runtime_error{"Endpoint string too long"}; - - syncStack.reset(new SyncStack{store, endpoint, true, args.verbose}); + if(!args.client.empty()) { + syncStack.reset(new SyncStack{store, args.client.c_str(), false, args.verbose}); + } else if(!args.server.empty()) { + syncStack.reset(new SyncStack{store, args.server.c_str(), true, args.verbose}); } stored::Poller poller; @@ -309,28 +501,53 @@ static void run(Arguments const& args, ExampleSync& store, DebugStack& debugStac throw std::runtime_error{"Poller add failed"}; } - while(true) { - poller.poll(200); - debugStack.recv(); - syncStack->recv(); + try { + while(true) { + poller.poll(PollInterval_ms); + debugStack.process(); + syncStack->process(); + } + } catch(disconnected&) { + poller.remove(syncStack->pollable()); + syncStack.reset(); + + log("Disconnected, lingering...\n"); + + auto now = std::chrono::steady_clock::now(); + auto end = now + std::chrono::milliseconds(ReconnectDelay_ms); + + while(now < end) { + poller.poll(PollInterval_ms); + debugStack.process(); + now = std::chrono::steady_clock::now(); + } + + throw; } } int main(int argc, char** argv) { +#ifdef STORED_OS_WINDOWS + setvbuf(stdout, nullptr, _IONBF, 0); +#else + setvbuf(stdout, nullptr, _IOLBF, 0); +#endif + srand((unsigned int)time(nullptr)); + try { Arguments args = parse_arguments(argc, argv); ExampleSync store; - DebugStack debugStack{store, args.debug_port}; + DebugStack debugStack{ + store, args.debug_port, args.client.empty() ? "server" : "client"}; while(true) { try { run(args, store, debugStack); } catch(disconnected&) { - fprintf(stderr, "Disconnected, restarting...\n"); - std::this_thread::sleep_for(std::chrono::seconds(1)); - ++store.restarted; + log("Restarting...\n"); + store.restarted++; } } } catch(exit_now&) { @@ -338,10 +555,10 @@ int main(int argc, char** argv) } catch(std::invalid_argument&) { return 1; } catch(std::exception& e) { - fprintf(stderr, "Error: %s\n", e.what()); + log("Error: %s\n", e.what()); return 2; } catch(...) { - fprintf(stderr, "Unknown error\n"); + log("Unknown error\n"); return 3; } } From bc31f8d678e90f091c3ac8ae5d4f5a1ae0ae0242 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Thu, 27 Nov 2025 16:07:02 +0100 Subject: [PATCH 08/14] improve arq/sync recovery --- .vscode/settings.json | 4 +- examples/lossy_sync/main.cpp | 3 +- include/libstored/macros.h | 1 + include/libstored/protocol.h | 225 ++++++++++++++++++++++++------- include/libstored/synchronizer.h | 60 ++++----- src/protocol.cpp | 88 +++++++----- src/synchronizer.cpp | 7 + 7 files changed, 274 insertions(+), 114 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7b71903c..3ff625b1 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -32,5 +32,7 @@ "PYTHONPATH": "${workspaceFolder}/dist/venv" }, "python.analysis.typeCheckingMode": "standard", - "python.defaultInterpreterPath": "${workspaceFolder}/dist/venv/bin/python3" + "python.defaultInterpreterPath": "${workspaceFolder}/dist/venv/bin/python3", + "vim.textwidth": 100, + "vim.tabstop": 8, } diff --git a/examples/lossy_sync/main.cpp b/examples/lossy_sync/main.cpp index 855d3bde..5164db82 100644 --- a/examples/lossy_sync/main.cpp +++ b/examples/lossy_sync/main.cpp @@ -449,8 +449,7 @@ class SyncStack { throw disconnected{}; case stored::ArqLayer::EventReconnect: log("ARQ reconnect event\n"); - // We need to reinitialize the synchronizer state. - throw disconnected{}; + break; case stored::ArqLayer::EventRetransmit: log("ARQ retransmit limit exceeded, ignored\n"); break; diff --git a/include/libstored/macros.h b/include/libstored/macros.h index 7ef1aa07..ce2ad75f 100644 --- a/include/libstored/macros.h +++ b/include/libstored/macros.h @@ -270,6 +270,7 @@ typedef SSIZE_T ssize_t; # if !defined(STORED_cpp_exceptions) # define try if(true) # define catch(...) if(false) +# define throw std::terminate(), (void) # endif # if !defined(STORED_cpp_rtti) && defined(__cpp_rtti) diff --git a/include/libstored/protocol.h b/include/libstored/protocol.h index 7e2ab4b4..5a19101c 100644 --- a/include/libstored/protocol.h +++ b/include/libstored/protocol.h @@ -66,29 +66,59 @@ class ProtocolLayer { virtual ~ProtocolLayer(); +protected: /*! * \brief Change the layer that receives our decoded frames. * \param up the layer, which can be \c nullptr */ - void setUp(ProtocolLayer* up = nullptr) + void justSetUp(ProtocolLayer* up = nullptr) noexcept { m_up = up; - connected(); } +public: + /*! + * \brief Change the layer that receives our decoded frames. + * \param up the layer, which can be \c nullptr + * + * It calls #disconnected() and #connected() appropriately. + */ + void setUp(ProtocolLayer* up = nullptr) + { + ProtocolLayer* oldUp = m_up; + justSetUp(up); + + if(oldUp) + oldUp->disconnected(); + if(up) + up->connected(); + } + +protected: /*! * \brief Change the layer that receives our encoded frames. * \param down the layer, which can be \c nullptr */ - void setDown(ProtocolLayer* down = nullptr) + void justSetDown(ProtocolLayer* down = nullptr) noexcept { m_down = down; } +public: + /*! + * \brief Change the layer that receives our encoded frames. + * \param down the layer, which can be \c nullptr + */ + void setDown(ProtocolLayer* down = nullptr) + { + // For symmetry with setUp(). + justSetDown(down); + } + /*! * \brief Return the lowest layer of the stack. */ - ProtocolLayer& bottom() + ProtocolLayer& bottom() noexcept { ProtocolLayer* p = this; @@ -101,7 +131,7 @@ class ProtocolLayer { /*! * \brief Return the lowest layer of the stack. */ - ProtocolLayer const& bottom() const + ProtocolLayer const& bottom() const noexcept { ProtocolLayer const* p = this; @@ -114,7 +144,7 @@ class ProtocolLayer { /*! * \brief Return the highest layer of the stack. */ - ProtocolLayer& top() + ProtocolLayer& top() noexcept { ProtocolLayer* p = this; @@ -127,7 +157,7 @@ class ProtocolLayer { /*! * \brief Return the highest layer of the stack. */ - ProtocolLayer const& top() const + ProtocolLayer const& top() const noexcept { ProtocolLayer const* p = this; @@ -148,18 +178,38 @@ class ProtocolLayer { */ ProtocolLayer& wrap(ProtocolLayer& up) { - ProtocolLayer* b = &bottom(); - ProtocolLayer* d = up.down(); + // Disconnect our old upper layer. + ProtocolLayer* old_up = this->up(); + if(old_up) { + old_up->justSetDown(nullptr); + justSetUp(nullptr); + } - if(d) { - b->setDown(d); - d->setUp(b); - b = &d->bottom(); + // Inject ourselves below the given layer. + ProtocolLayer* current_bottom = &bottom(); + ProtocolLayer* inject_above = up.down(); + + if(inject_above) { + current_bottom->justSetDown(inject_above); + inject_above->justSetUp(current_bottom); + current_bottom = &inject_above->bottom(); } - up.setDown(this); - setUp(&up); - return *b; + // Set our new upper layer. + up.justSetDown(this); + justSetUp(&up); + + // Invoke all notifications. If an exception would be thrown here, the stack is + // updated appropriately. + if(old_up) + old_up->disconnected(); + + if(inject_above) + current_bottom->connected(); + else + up.connected(); + + return *current_bottom; } /*! @@ -173,27 +223,39 @@ class ProtocolLayer { */ ProtocolLayer& stack(ProtocolLayer& down) { - ProtocolLayer* u = down.up(); + // Disconnect our old lower layer. + ProtocolLayer* old_down = this->down(); + if(old_down) + old_down->justSetUp(nullptr); + + // Inject ourselves above the given layer. + ProtocolLayer* current_top = &top(); + ProtocolLayer* inject_below = down.up(); + + if(inject_below) { + inject_below->justSetDown(current_top); + current_top->justSetUp(inject_below); + current_top = &inject_below->top(); + } - setDown(&down); - down.setUp(this); + justSetDown(&down); + down.justSetUp(this); - ProtocolLayer* t = &top(); + // Invoke all notifications. If an exception would be thrown here, the stack is + // updated appropriately. + if(old_down) + old_down->disconnected(); - if(u) { - u->setDown(t); - t->setUp(u); - t = &u->top(); - } + connected(); - return *t; + return *current_top; } /*! * \brief Returns the layer above this one. * \return the layer, or \c nullptr if there is none. */ - ProtocolLayer* up() const + ProtocolLayer* up() const noexcept { return m_up; } @@ -202,7 +264,7 @@ class ProtocolLayer { * \brief Returns the layer below this one. * \return the layer, or \c nullptr if there is none. */ - ProtocolLayer* down() const + ProtocolLayer* down() const noexcept { return m_down; } @@ -242,12 +304,12 @@ class ProtocolLayer { * \brief Flags the current response as purgeable. * * This may influence how a response is handled. Especially, in case - * of retransmits of lost packets, one may decide to either reexecute + * of retransmits of lost packets, one may decide to either re-execute * the command, or to save the first response and resend it when the * command was retransmitted. In that sense, a precious response * (default) means that every layer should handle the data with case, * as it cannot be recovered once it is lost. When the response is - * flagged purgeeble, the response may be thrown away after the first + * flagged purgeable, the response may be thrown away after the first * try to transmit it to the client. * * By default, all responses are precious. @@ -305,6 +367,15 @@ class ProtocolLayer { up()->connected(); } + /*! + * \brief Disconnected notification (bottom-up). + */ + virtual void disconnected() + { + if(up()) + up()->disconnected(); + } + private: /*! \brief The layer above this one. */ ProtocolLayer* m_up; @@ -471,8 +542,8 @@ class SegmentationLayer : public ProtocolLayer { * not have a payload (so, no decode() has to be invoked upon receive), should set bit 6. This also * applies to the reset message. Bit 6 is implied for an ack. * - * Retransmits are triggered every time a message is queued for encoding, or when #flush() is - * called. There is no timeout specified. + * Retransmits are triggered only upon #keepAlive() or when #flush() is called. There is no timeout + * specified. * * One may decide to use a #stored::SegmentationLayer higher in the protocol stack to reduce the * amount of data to retransmit when a message is lost (only one segment is retransmitted, not the @@ -499,8 +570,10 @@ class SegmentationLayer : public ProtocolLayer { * A -> B: ack (0x80) * \endverbatim * - * Queued messages are retransmitted after the reset, although they may be duplicated when an ack is - * lost during the reset. Messages are never completely lost. + * When a reset is received, it is unknown what happened to the previous messages; the last + * unacknowledged messages may or may not have been received. Therefore, the session is assumed to + * be reset, the encode queue is dropped, and the connection is considered disconnected, until the + * ack on the reset is received. */ class ArqLayer : public ProtocolLayer { STORED_CLASS_NOCOPY(ArqLayer) @@ -532,6 +605,8 @@ class ArqLayer : public ProtocolLayer { virtual bool flush() override; virtual void reset() override; virtual void connected() override; + virtual void disconnected() override; + bool isConnected() const; void keepAlive(); enum Event { @@ -630,6 +705,7 @@ class ArqLayer : public ProtocolLayer { void popEncodeQueue(); void pushEncodeQueue(void const* buffer, size_t len, bool back = true); String::type& pushEncodeQueueRaw(bool back = true); + void pushReset(); private: # if STORED_cplusplus < 201103L @@ -644,6 +720,7 @@ class ArqLayer : public ProtocolLayer { Deque::type m_spare; size_t m_encodeQueueSize; EncodeState m_encodeState; + bool m_connected; bool m_pauseTransmit; bool m_didTransmit; uint8_t m_retransmits; @@ -1085,48 +1162,62 @@ class IdleCheckLayer : public ProtocolLayer { }; # if STORED_cplusplus >= 201103L -template +template class CallbackLayer; template static inline CallbackLayer< - typename std::decay::type, typename std::decay::type, void (*)()> + typename std::decay::type, typename std::decay::type, void (*)(), void (*)()> make_callback(Up&& up, Down&& down); template static inline CallbackLayer< typename std::decay::type, typename std::decay::type, - typename std::decay::type> + typename std::decay::type, void (*)()> make_callback(Up&& up, Down&& down, Connected&& connected); +template +static inline CallbackLayer< + typename std::decay::type, typename std::decay::type, + typename std::decay::type, typename std::decay::type> +make_callback(Up&& up, Down&& down, Connected&& connected, Disconnected&& disconnected); + /*! * \brief Callback class that invokes a callback for every messages through the stack. * * \copydetails #stored::make_callback() */ -template +template class CallbackLayer : public ProtocolLayer { public: typedef ProtocolLayer base; protected: - template - CallbackLayer(U&& u, D&& d, C&& c) + template + CallbackLayer(U&& u, D&& d, C&& c, Z&& z) : m_up{std::forward(u)} , m_down{std::forward(d)} , m_connected{std::forward(c)} + , m_disconnected{std::forward(z)} {} template - friend CallbackLayer::type, typename std::decay::type, void (*)()> + friend CallbackLayer< + typename std::decay::type, typename std::decay::type, void (*)(), void (*)()> make_callback(U&& up, D&& down); template friend CallbackLayer< typename std::decay::type, typename std::decay::type, - typename std::decay::type> + typename std::decay::type, void (*)()> make_callback(U&& up, D&& down, C&& connected); + template + friend CallbackLayer< + typename std::decay::type, typename std::decay::type, + typename std::decay::type, typename std::decay::type> + make_callback(U&& up, D&& down, C&& connected, Z&& disconnected); + public: CallbackLayer(CallbackLayer&& l) noexcept : m_up{std::move(l.m_up)} @@ -1159,6 +1250,12 @@ class CallbackLayer : public ProtocolLayer { base::connected(); } + virtual void disconnected() override + { + m_disconnected(); + base::disconnected(); + } + # ifndef DOXYGEN using base::encode; # endif @@ -1167,6 +1264,7 @@ class CallbackLayer : public ProtocolLayer { Up m_up; Down m_down; Connected m_connected; + Disconnected m_disconnected; }; /*! @@ -1185,12 +1283,12 @@ class CallbackLayer : public ProtocolLayer { */ template static inline CallbackLayer< - typename std::decay::type, typename std::decay::type, void (*)()> + typename std::decay::type, typename std::decay::type, void (*)(), void (*)()> make_callback(Up&& up, Down&& down) { return CallbackLayer< - typename std::decay::type, typename std::decay::type, void (*)()>{ - std::forward(up), std::forward(down), []() {}}; + typename std::decay::type, typename std::decay::type, void (*)(), + void (*)()>{std::forward(up), std::forward(down), []() {}, []() {}}; } /*! @@ -1213,13 +1311,44 @@ make_callback(Up&& up, Down&& down) template static inline CallbackLayer< typename std::decay::type, typename std::decay::type, - typename std::decay::type> + typename std::decay::type, void (*)()> make_callback(Up&& up, Down&& down, Connected&& connected) { return CallbackLayer< typename std::decay::type, typename std::decay::type, - typename std::decay::type>{ - std::forward(up), std::forward(down), std::forward(connected)}; + typename std::decay::type, void (*)()>{ + std::forward(up), std::forward(down), std::forward(connected), + []() {}}; +} +/*! + * \brief Creates a ProtocolLayer that invokes a given callback on every messages, connected, or + * disconnected event through the layer. + * + * Use as follows: + * + * \code + * auto cb = stored::make_callback( + * [&](void*, size_t){ ... }, + * [&](void const&, size_t, bool){ ... }, + * [&](){ ... }, + * [&](){ ... }); + * \endcode + * + * The first argument (a lambda in the example above), gets the parameters as passed to \c decode(). + * The second argument get the parameters as passed to \c encode(). The third one gets invoked upon + * \c connected(), the fourth on \c disconnected(). + */ +template +static inline CallbackLayer< + typename std::decay::type, typename std::decay::type, + typename std::decay::type, typename std::decay::type> +make_callback(Up&& up, Down&& down, Connected&& connected, Disconnected&& disconnected) +{ + return CallbackLayer< + typename std::decay::type, typename std::decay::type, + typename std::decay::type, typename std::decay::type>{ + std::forward(up), std::forward(down), std::forward(connected), + std::forward(disconnected)}; } # endif // C++11 diff --git a/include/libstored/synchronizer.h b/include/libstored/synchronizer.h index 187c1fec..77287db1 100644 --- a/include/libstored/synchronizer.h +++ b/include/libstored/synchronizer.h @@ -1,19 +1,19 @@ #ifndef LIBSTORED_SYNCHRONIZER_H #define LIBSTORED_SYNCHRONIZER_H -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 #ifdef __cplusplus -# include -# include -# include -# include +# include +# include +# include +# include -# include -# include -# include +# include +# include +# include namespace stored { @@ -125,15 +125,15 @@ class StoreJournal { bool hasChanged(Key key, Seq since) const; bool hasChanged(Seq since) const; -# if STORED_cplusplus >= 201103L +# if STORED_cplusplus >= 201103L // breathe does not like function typedefs using IterateChangedCallback = void(Key, void*); -# else +# else typedef void(IterateChangedCallback)(Key, void*); -# endif +# endif void iterateChanged(Seq since, IterateChangedCallback* cb, void* arg = nullptr) const; -# if STORED_cplusplus >= 201103L +# if STORED_cplusplus >= 201103L /*! * \brief Iterate all changes since the given seq. * @@ -152,7 +152,7 @@ class StoreJournal { }, &cb); } -# endif +# endif void encodeHash(ProtocolLayer& p, bool last = false) const; static void encodeHash(ProtocolLayer& p, char const* hash, bool last = false); @@ -396,14 +396,14 @@ class Synchronizable : public Base { typedef typename base::Objects Objects; -# if STORED_cplusplus >= 201103L +# if STORED_cplusplus >= 201103L template explicit Synchronizable(Args&&... args) : base(std::forward(args)...) -# else +# else Synchronizable() : base() -# endif +# endif , m_callback(*this) , m_journal(base::hash(), base::buffer(), sizeof(base::data().buffer), &m_callback) { @@ -425,7 +425,7 @@ class Synchronizable : public Base { } // NOLINTNEXTLINE(hicpp-explicit-conversions) - operator StoreJournal const &() const + operator StoreJournal const&() const { return journal(); } @@ -448,9 +448,9 @@ class Synchronizable : public Base { journal().reserveHeap(Base::VariableCount); } -# define MAX2(a, b) ((a) > (b) ? (a) : (b)) -# define MAX3(a, b, c) MAX2(MAX2((a), (b)), (c)) -# define MAX4(a, b, c, d) MAX3(MAX2((a), (b)), (c), (d)) +# define MAX2(a, b) ((a) > (b) ? (a) : (b)) +# define MAX3(a, b, c) MAX2(MAX2((a), (b)), (c)) +# define MAX4(a, b, c, d) MAX3(MAX2((a), (b)), (c), (d)) enum STORED_ANONYMOUS { /*! \brief Maximum size of any Synchronizer message for this store. */ MaxMessageSize = MAX4( @@ -465,9 +465,9 @@ class Synchronizable : public Base { 1 /*cmd*/ + 40 /*hash*/ ), }; -# undef MAX4 -# undef MAX3 -# undef MAX2 +# undef MAX4 +# undef MAX3 +# undef MAX2 protected: void __hookExitX(Type::type type, void* buffer, size_t len, bool changed) noexcept @@ -497,12 +497,11 @@ class Synchronizable : public Base { }; /*! \deprecated Use \c stored::store or \c STORE_T instead. */ -# define STORE_SYNC_BASE_CLASS(Base, Impl) \ - STORE_T(Impl, ::stored::Synchronizable, ::stored::Base) +# define STORE_SYNC_BASE_CLASS(Base, Impl) STORE_T(Impl, ::stored::Synchronizable, ::stored::Base) /*! \deprecated Use \c STORE_CLASS instead. */ -# define STORE_SYNC_CLASS_BODY(Base, Impl) \ - STORE_CLASS(Impl, ::stored::Synchronizable, ::stored::Base) +# define STORE_SYNC_CLASS_BODY(Base, Impl) \ + STORE_CLASS(Impl, ::stored::Synchronizable, ::stored::Base) class Synchronizer; @@ -533,18 +532,18 @@ class SyncConnection : public ProtocolLayer { typedef ProtocolLayer base; typedef uint16_t Id; -# ifdef DOXYGEN +# ifdef DOXYGEN // breathe does not like complex expressions. static char const Hello = 'h'; static char const Welcome = 'w'; static char const Update = 'u'; static char const Bye = 'b'; -# else +# else static char const Hello = Config::StoreInLittleEndian ? 'h' : 'H'; static char const Welcome = Config::StoreInLittleEndian ? 'w' : 'W'; static char const Update = Config::StoreInLittleEndian ? 'u' : 'U'; static char const Bye = Config::StoreInLittleEndian ? 'b' : 'B'; -# endif +# endif SyncConnection(Synchronizer& synchronizer, ProtocolLayer& connection); virtual ~SyncConnection() override; @@ -559,6 +558,7 @@ class SyncConnection : public ProtocolLayer { void decode(void* buffer, size_t len) override final; virtual void reset() override; + virtual void disconnected() override; protected: Id nextId(); diff --git a/src/protocol.cpp b/src/protocol.cpp index 15457321..e2da2572 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -466,6 +466,7 @@ ArqLayer::ArqLayer(size_t maxEncodeBuffer, ProtocolLayer* up, ProtocolLayer* dow , m_maxEncodeBuffer(maxEncodeBuffer) , m_encodeQueueSize() , m_encodeState(EncodeStateIdle) + , m_connected() , m_pauseTransmit() , m_didTransmit() , m_retransmits() @@ -495,11 +496,13 @@ void ArqLayer::reset() stored_assert(m_encodeQueueSize == 0); m_encodeState = EncodeStateIdle; + m_connected = false; m_pauseTransmit = false; m_didTransmit = false; m_retransmits = 0; m_sendSeq = 0; m_recvSeq = 0; + base::disconnected(); base::reset(); keepAlive(); } @@ -507,7 +510,7 @@ void ArqLayer::reset() void ArqLayer::decode(void* buffer, size_t len) { uint8_t* buffer_ = static_cast(buffer); - bool reconnect = false; + bool reset_handshake = false; // Usually, we expect something we have to ack. Possibly a reset command to ack afterwards. // After the response, a transmit() may be called. @@ -516,6 +519,9 @@ void ArqLayer::decode(void* buffer, size_t len) bool do_transmit = false; bool do_decode = false; + stored_assert(!m_pauseTransmit); + m_pauseTransmit = true; + while(len > 0) { uint8_t hdr = buffer_[0]; @@ -532,7 +538,8 @@ void ArqLayer::decode(void* buffer, size_t len) if(unlikely((hdr & SeqMask) == 0)) { // This is an ack to our reset message. We are connected // now. - reconnect = true; + reset_handshake = true; + m_connected = true; base::connected(); } } @@ -551,37 +558,26 @@ void ArqLayer::decode(void* buffer, size_t len) len--; } else if((hdr & SeqMask) == 0) { // This is an unexpected reset message. Reset communication. - event(EventReconnect); + base::disconnected(); + + if(isConnected()) { + m_connected = false; + event(EventReconnect); + } // Send ack. m_recvSeq = nextSeq(0); // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) resp[resplen++] = (uint8_t)AckFlag; - // Also reset our send seq. - if(!reconnect - && (m_encodeQueueSize == 0 - || (*m_encodeQueue.front())[0] != (char)NopFlag)) { - // Inject reset message in queue. - String::type& s = pushEncodeQueueRaw(false); - s.push_back((char)NopFlag); - m_encodeQueueSize++; - m_sendSeq = nextSeq(0); - - // Reencode existing outbound messages. - for(Deque::type::iterator it = - ++m_encodeQueue.begin(); - it != m_encodeQueue.end(); ++it) { - (**it)[0] = - (char)((uint8_t)((uint8_t)(**it)[0] & (uint8_t)~SeqMask) - | m_sendSeq); - m_sendSeq = nextSeq(m_sendSeq); - } + if(!reset_handshake) { + // Reset from our direction too. + pushReset(); + do_transmit = true; } - do_transmit = true; - buffer_++; - len--; + // Drop the rest. + len = 0; } else if(nextSeq((uint8_t)(hdr & SeqMask)) == m_recvSeq) { // This is a retransmit of the previous message. // Send ack again. @@ -616,20 +612,17 @@ void ArqLayer::decode(void* buffer, size_t len) if(do_decode) { // Do decode first, as recursive calls to decode/encode may corrupt our buffer. - stored_assert(!m_pauseTransmit); - m_pauseTransmit = true; - resetDidTransmit(); // Decode and queue encodes only. base::decode(buffer_, len); if(didTransmit()) do_transmit = true; - - // We do not expect recursion here that influence this flag. - stored_assert(m_pauseTransmit); - m_pauseTransmit = false; } + // We do not expect recursion here that influence this flag. + stored_assert(m_pauseTransmit); + m_pauseTransmit = false; + if(resplen) { // First encode the responses... base::encode(resp, resplen, !do_transmit); @@ -751,7 +744,7 @@ void ArqLayer::event(ArqLayer::Event e) break; case EventEncodeBufferOverflow: // Cannot handle this. - abort(); + throw std::bad_alloc(); } } } @@ -814,6 +807,14 @@ size_t ArqLayer::retransmits() const return m_retransmits ? m_retransmits - 1U : 0U; } +/*! + * \brief Returns whether the connection is currently established. + */ +bool ArqLayer::isConnected() const +{ + return m_connected; +} + /*! * \brief Send a keep-alive packet to check the connection. * @@ -835,6 +836,21 @@ void ArqLayer::keepAlive() transmit(); } +/*! + * \brief Clear encode queue and push a reset message. + */ +void ArqLayer::pushReset() +{ + while(!m_encodeQueue.empty()) + popEncodeQueue(); + + stored_assert(m_encodeQueueSize == 0); + m_sendSeq = 0; + pushEncodeQueueRaw().push_back((char)(m_sendSeq | NopFlag)); + m_encodeQueueSize++; + m_sendSeq = nextSeq(m_sendSeq); +} + /*! * \brief Drop front of encode queue. */ @@ -933,6 +949,12 @@ void ArqLayer::connected() // retransmits or resets. } +void ArqLayer::disconnected() +{ + // Don't propagate the disconnected event. A disconnection is handled by this layer itself, + // via retransmits or resets. +} + ////////////////////////////// diff --git a/src/synchronizer.cpp b/src/synchronizer.cpp index 2a36738e..49f55923 100644 --- a/src/synchronizer.cpp +++ b/src/synchronizer.cpp @@ -844,6 +844,13 @@ void SyncConnection::reset() helloAgain(); } +void SyncConnection::disconnected() +{ + dropNonSources(); + base::disconnected(); + helloAgain(); +} + /*! * \brief Returns the Synchronizer that manages this connection. */ From 8260be96aa18e4c27ed81a211ec41decd957d3ed Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 28 Nov 2025 14:30:56 +0100 Subject: [PATCH 09/14] improve reconnect robustess --- CHANGELOG.rst | 5 + examples/lossy_sync/main.cpp | 10 +- include/libstored/macros.h | 6 +- include/libstored/protocol.h | 29 ++++-- include/libstored/synchronizer.h | 11 ++- src/protocol.cpp | 164 +++++++++++++++++++++++-------- src/synchronizer.cpp | 36 ++++++- 7 files changed, 200 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5cf44645..0d8b22d3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -25,8 +25,13 @@ Added ````` - Operations such as ``-=`` and ``++`` for store variables in C++. + +Changed +``````` + - Don't auto-retransmit on encode by the ``stored::ArqLayer``. Only retransmit on ``keepAlive()``. +- Improve reconnection behavior on protocol layers. .. _Unreleased: https://github.com/DEMCON/libstored/compare/v2.0.0...HEAD diff --git a/examples/lossy_sync/main.cpp b/examples/lossy_sync/main.cpp index 5164db82..99bf388a 100644 --- a/examples/lossy_sync/main.cpp +++ b/examples/lossy_sync/main.cpp @@ -282,8 +282,9 @@ class SyncStack { // We don't want to do ARQ on large messages, so we segment them to some // appropriate size. wrap(32U); - // Perform retransmits. Limit the encode queue to 10 KiB. - m_arq = wrap(10240U); + // Perform retransmits. Limit the encode queue to have some bound on the maximum + // RTT. + m_arq = wrap(1024U); m_arq->setEventCallback( [](stored::ArqLayer&, stored::ArqLayer::Event event, void* arg) { static_cast(arg)->event(event); @@ -308,7 +309,7 @@ class SyncStack { // Optional: buffer partial messages to reduce the number of sends/receives on the // wire. - wrap(); + wrap(64U); if(verbose) wrap(stdout, "raw"); @@ -486,7 +487,8 @@ static void run(Arguments const& args, ExampleSync& store, DebugStack& debugStac if(!args.client.empty()) { syncStack.reset(new SyncStack{store, args.client.c_str(), false, args.verbose}); } else if(!args.server.empty()) { - syncStack.reset(new SyncStack{store, args.server.c_str(), true, args.verbose}); + syncStack.reset( + new SyncStack{store, args.server.c_str(), true, args.verbose, args.ber}); } stored::Poller poller; diff --git a/include/libstored/macros.h b/include/libstored/macros.h index ce2ad75f..3a801500 100644 --- a/include/libstored/macros.h +++ b/include/libstored/macros.h @@ -292,7 +292,11 @@ typedef SSIZE_T ssize_t; # define nullptr NULL # endif # ifndef noexcept -# define noexcept throw() +# ifdef STORED_cpp_exceptions +# define noexcept throw() +# else +# define noexcept +# endif # endif # endif # ifndef is_default diff --git a/include/libstored/protocol.h b/include/libstored/protocol.h index 5a19101c..f50b515d 100644 --- a/include/libstored/protocol.h +++ b/include/libstored/protocol.h @@ -463,6 +463,7 @@ class TerminalLayer : public ProtocolLayer { # endif virtual size_t mtu() const override; virtual void reset() override; + virtual void disconnected() override; virtual void nonDebugEncode(void const* buffer, size_t len); @@ -519,6 +520,7 @@ class SegmentationLayer : public ProtocolLayer { size_t mtu() const final; size_t lowerMtu() const; virtual void reset() override; + virtual void disconnected() override; private: size_t m_mtu; @@ -587,7 +589,7 @@ class ArqLayer : public ProtocolLayer { enum STORED_ANONYMOUS { /*! \brief Number of successive retransmits before the event is emitted. */ - RetransmitCallbackThreshold = 10, + RetransmitCallbackThreshold = 16, }; explicit ArqLayer( @@ -731,7 +733,7 @@ class ArqLayer : public ProtocolLayer { /*! * \brief A layer that performs Automatic Repeat Request operations on messages for - * #stored::Debugger. + * #stored::Debugger. * * Only apply this layer on #stored::Debugger, as it assumes a REQ/REP * mechanism. For a general purpose ARQ, use #stored::ArqLayer. @@ -739,7 +741,7 @@ class ArqLayer : public ProtocolLayer { * This layer allows messages that are lost, to be retransmitted on both * the request and response side. The implementation assumes that lost * message is possible, but rare. It optimizes on the normal case that - * message arrive. Retransmits may be relatively expensive. + * messages arrive. Retransmits may be relatively expensive. * * Messages must be either lost or arrive correctly. Make sure to do * checksumming in the layer below. Moreover, you might want the @@ -783,13 +785,13 @@ class ArqLayer : public ProtocolLayer { * The application has limited buffering. So, neither the request nor the * full response may be buffered for (partial) retransmission. Therefore, * it may be the case that when the response was lost, the request is - * reexecuted. It is up to the buffer size as specified in DebugArqLayer's + * re-executed. It is up to the buffer size as specified in DebugArqLayer's * constructor and stored::Debugger to determine when it is safe or - * required to reexected upon every retransmit. For example, writes are not - * reexecuted, as a write may have unexpected side-effects, while it is - * safe to reexecute a read of a normal variable. When the directory is + * required to re-executed upon every retransmit. For example, writes are not + * re-executed, as a write may have unexpected side-effects, while it is + * safe to re-execute a read of a normal variable. When the directory is * requested, the response is often too long to buffer, and the response is - * constant, so it is not buffered either and just reexecuted. Note that if + * constant, so it is not buffered either and just re-executed. Note that if * the buffer is too small, reading from a stream (s command) will do a * destructive read, but this data may be lost if the response is lost. * Configure the stream size and DebugArqLayer's buffer appropriate if that is @@ -799,7 +801,7 @@ class ArqLayer : public ProtocolLayer { * numbers more often. Upon retransmission of the same data, the same * sequence numbers are used, just like the retransmission of the request. * However, if the data may have been changed, as the response was not - * buffered and the request was reexecuted, the reset flag is set of the + * buffered and the request was re-executed, the reset flag is set of the * first response message, while it has a new sequence number. The client * should accept this new sequence number and discard all previously * collected response messages. @@ -900,6 +902,7 @@ class Crc8Layer : public ProtocolLayer { virtual size_t mtu() const override; virtual void reset() override; + virtual void disconnected() override; protected: static uint8_t compute(uint8_t input, uint8_t crc = init); @@ -932,6 +935,7 @@ class Crc16Layer : public ProtocolLayer { virtual size_t mtu() const override; virtual void reset() override; + virtual void disconnected() override; protected: static uint16_t compute(uint8_t input, uint16_t crc = init); @@ -968,6 +972,7 @@ class Crc32Layer : public ProtocolLayer { virtual size_t mtu() const override; virtual void reset() override; + virtual void disconnected() override; protected: static uint32_t compute(uint8_t input, uint32_t crc = init); @@ -1000,6 +1005,10 @@ class BufferLayer : public ProtocolLayer { # endif virtual void reset() override; + virtual void connected() override; + +protected: + void allocate(); private: size_t m_size; @@ -1365,6 +1374,8 @@ class Loopback1 final : public ProtocolLayer { void encode(void const* buffer, size_t len, bool last = true) override final; void reset() override final; + void connected() override final; + void disconnected() override final; void reserve(size_t capacity); private: diff --git a/include/libstored/synchronizer.h b/include/libstored/synchronizer.h index 77287db1..82310687 100644 --- a/include/libstored/synchronizer.h +++ b/include/libstored/synchronizer.h @@ -551,6 +551,7 @@ class SyncConnection : public ProtocolLayer { Synchronizer& synchronizer() const; bool isSynchronizing(StoreJournal& store) const; + bool isConnected(StoreJournal& store) const; void source(StoreJournal& store); void drop(StoreJournal& store); @@ -591,10 +592,17 @@ class SyncConnection : public ProtocolLayer { {} StoreJournal::Seq seq; - // Id determined by remote class (got via Hello message) + // Id determined by remote class (got via Hello message). + // idOut == 0 means not connected. Id idOut; // When true, this store was initially synchronized from there to here. bool source; + + /*! \brief Return if the store is connected. */ + bool connected() const + { + return idOut != 0; + } }; typedef Map::type StoreMap; @@ -694,6 +702,7 @@ class Synchronizer { bool isSynchronizing(StoreJournal& j) const; bool isSynchronizing(StoreJournal& j, SyncConnection& notOverConnection) const; + bool isConnected(StoreJournal& j, SyncConnection& connection) const; /*! * \brief Return a buffer large enough to encode messages in. diff --git a/src/protocol.cpp b/src/protocol.cpp index e2da2572..1f82cd37 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -36,6 +36,7 @@ #endif #include +#include #include namespace stored { @@ -54,9 +55,9 @@ namespace stored { ProtocolLayer::~ProtocolLayer() { if(up() && up()->down() == this) - up()->setDown(down()); + up()->justSetDown(down()); if(down() && down()->up() == this) - down()->setUp(up()); + down()->justSetUp(up()); } @@ -219,6 +220,13 @@ void TerminalLayer::reset() base::reset(); } +void TerminalLayer::disconnected() +{ + m_decodeState = StateNormal; + m_buffer.clear(); + base::disconnected(); +} + /*! * \copydoc stored::ProtocolLayer::~ProtocolLayer() */ @@ -360,6 +368,12 @@ void SegmentationLayer::reset() base::reset(); } +void SegmentationLayer::disconnected() +{ + m_decode.clear(); + base::disconnected(); +} + size_t SegmentationLayer::mtu() const { // We segment, so all layers above can use any size they want. @@ -523,11 +537,17 @@ void ArqLayer::decode(void* buffer, size_t len) m_pauseTransmit = true; while(len > 0) { - uint8_t hdr = buffer_[0]; + uint8_t const hdr = buffer_[0]; + uint8_t const hdrSeq = (uint8_t)(hdr & SeqMask); if(hdr & AckFlag) { + if(unlikely(hdrSeq == 0)) { + // This may be an ack to our reset message. + reset_handshake = true; + } + if(waitingForAck() - && (hdr & SeqMask) == ((uint8_t)(*m_encodeQueue.front())[0] & SeqMask)) { + && hdrSeq == ((uint8_t)(*m_encodeQueue.front())[0] & SeqMask)) { // They got our last transmission. popEncodeQueue(); m_retransmits = 0; @@ -535,55 +555,53 @@ void ArqLayer::decode(void* buffer, size_t len) // Transmit next message, if any. do_transmit = true; - if(unlikely((hdr & SeqMask) == 0)) { + if(unlikely(reset_handshake)) { // This is an ack to our reset message. We are connected // now. - reset_handshake = true; m_connected = true; + m_recvSeq = nextSeq(0); base::connected(); } } buffer_++; len--; - } else if(likely((hdr & SeqMask) == m_recvSeq)) { - // This is a proper next message. - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) - resp[resplen++] = (uint8_t)(m_recvSeq | AckFlag); - m_recvSeq = nextSeq(m_recvSeq); - - do_decode = !(hdr & NopFlag); - do_transmit = true; // Send out next message. - buffer_++; - len--; - } else if((hdr & SeqMask) == 0) { - // This is an unexpected reset message. Reset communication. - base::disconnected(); - - if(isConnected()) { - m_connected = false; - event(EventReconnect); - } + } else if(unlikely(hdrSeq == 0)) { + // This is part of the reset handshake. // Send ack. - m_recvSeq = nextSeq(0); // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) resp[resplen++] = (uint8_t)AckFlag; + // Drop the rest. + len = 0; if(!reset_handshake) { - // Reset from our direction too. + // This is an unexpected reset message. Reset communication. pushReset(); do_transmit = true; + + if(isConnected()) { + m_connected = false; + event(EventReconnect); + } + + base::disconnected(); } + } else if(likely(hdrSeq == m_recvSeq)) { + // This is a proper next message. + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) + resp[resplen++] = (uint8_t)(m_recvSeq | AckFlag); + m_recvSeq = nextSeq(m_recvSeq); - // Drop the rest. - len = 0; - } else if(nextSeq((uint8_t)(hdr & SeqMask)) == m_recvSeq) { + do_decode = !(hdr & NopFlag); + do_transmit = true; // Send out next message. + buffer_++; + len--; + } else if(nextSeq(hdrSeq) == m_recvSeq) { // This is a retransmit of the previous message. // Send ack again. // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) - resp[resplen++] = (uint8_t)((uint8_t)(hdr & SeqMask) | AckFlag); - + resp[resplen++] = (uint8_t)(hdrSeq | AckFlag); if((hdr & NopFlag)) { buffer_++; len--; @@ -712,10 +730,12 @@ bool ArqLayer::transmit() // Only queue for now. return false; - if(m_retransmits < std::numeric_limits::max()) - m_retransmits++; - if(m_retransmits >= RetransmitCallbackThreshold) + if(m_retransmits < std::numeric_limits::max()) { + if(++m_retransmits % RetransmitCallbackThreshold == 0) + event(EventRetransmit); + } else { event(EventRetransmit); + } // (Re)transmit first message. stored_assert(waitingForAck()); @@ -849,6 +869,7 @@ void ArqLayer::pushReset() pushEncodeQueueRaw().push_back((char)(m_sendSeq | NopFlag)); m_encodeQueueSize++; m_sendSeq = nextSeq(m_sendSeq); + m_recvSeq = 0; } /*! @@ -1327,6 +1348,12 @@ void Crc8Layer::reset() base::reset(); } +void Crc8Layer::disconnected() +{ + m_crc = init; + base::disconnected(); +} + void Crc8Layer::decode(void* buffer, size_t len) { if(len == 0) @@ -1423,6 +1450,12 @@ void Crc16Layer::reset() base::reset(); } +void Crc16Layer::disconnected() +{ + m_crc = init; + base::disconnected(); +} + void Crc16Layer::decode(void* buffer, size_t len) { if(len < 2) @@ -1533,6 +1566,12 @@ void Crc32Layer::reset() base::reset(); } +void Crc32Layer::disconnected() +{ + m_crc = (uint32_t)init; + base::disconnected(); +} + void Crc32Layer::decode(void* buffer, size_t len) { if(len < 4) @@ -1595,12 +1634,15 @@ size_t Crc32Layer::mtu() const /*! * \brief Constructor for a buffer with given size. * - * If \p size is 0, the buffer it not bounded. + * If \p size is 0, the buffer it not bounded. Note that for unbounded buffers, the buffer may + * dynamically allocate memory as needed, which may fail. */ BufferLayer::BufferLayer(size_t size, ProtocolLayer* up, ProtocolLayer* down) : base(up, down) , m_size(size ? size : std::numeric_limits::max()) -{} +{ + allocate(); +} void BufferLayer::reset() { @@ -1608,6 +1650,43 @@ void BufferLayer::reset() base::reset(); } +void BufferLayer::connected() +{ + allocate(); + base::connected(); +} + +void BufferLayer::allocate() +{ + size_t const max_prealloc = 0x100000U; // 1 MiB + + bool const unbounded = m_size == std::numeric_limits::max(); + size_t const mtu = this->mtu(); + + if(mtu == 0 && unbounded) { + // No limit. Allocate when required. + return; + } + + size_t bounded = m_buffer.max_size(); + if(!unbounded && m_size < bounded) + bounded = m_size; + if(mtu && mtu < bounded) + bounded = mtu; + + if(bounded > max_prealloc) { + // High limit. Assume that dynamic allocation is fine. + return; + } + + if(m_buffer.capacity() >= bounded) { + // Already allocated. + return; + } + + m_buffer.reserve(bounded); +} + /*! * \brief Collects all partial buffers, and passes the full encoded data on when \p last is set. */ @@ -1904,6 +1983,10 @@ void impl::Loopback1::reset() base::reset(); } +void impl::Loopback1::connected() {} + +void impl::Loopback1::disconnected() {} + void impl::Loopback1::reserve(size_t capacity) { if(likely(capacity <= m_capacity)) @@ -1911,13 +1994,8 @@ void impl::Loopback1::reserve(size_t capacity) // NOLINTNEXTLINE(cppcoreguidelines-owning-memory, cppcoreguidelines-no-malloc) void* p = realloc(m_buffer, capacity); - if(unlikely(!p)) { -#ifdef STORED_cpp_exceptions + if(unlikely(!p)) throw std::bad_alloc(); -#else - std::terminate(); -#endif - } m_buffer = static_cast(p); m_capacity = capacity; @@ -1925,7 +2003,7 @@ void impl::Loopback1::reserve(size_t capacity) /*! * \brief Collect partial data, and passes into the \c decode() of \c to when it has the full - * message. + * message. */ void impl::Loopback1::encode(void const* buffer, size_t len, bool last) { diff --git a/src/synchronizer.cpp b/src/synchronizer.cpp index 49f55923..3dd9c4c0 100644 --- a/src/synchronizer.cpp +++ b/src/synchronizer.cpp @@ -883,6 +883,17 @@ bool SyncConnection::isSynchronizing(StoreJournal& store) const return m_store.find(&store) != m_store.end(); } +/*! + * \brief Returns if the given store is currently connected over this connection. + */ +bool SyncConnection::isConnected(StoreJournal& store) const +{ + StoreMap::const_iterator it = m_store.find(&store); + if(it == m_store.end()) + return false; + return it->second.connected(); +} + /*! * \brief Use this connection as a source of the given store. */ @@ -1076,7 +1087,7 @@ StoreJournal::Seq SyncConnection::process(StoreJournal& store, void* encodeBuffe if(s == m_store.end()) // Unknown store. return 0; - if(!store.hasChanged(s->second.seq)) + if(!s->second.connected() || !store.hasChanged(s->second.seq)) // No recent changes. return 0; @@ -1145,6 +1156,7 @@ void SyncConnection::decode(void* buffer, size_t len) break; } + stored_assert(si.connected()); id = nextId(); m_idIn[id] = j; encodeId(id, false); @@ -1188,6 +1200,7 @@ void SyncConnection::decode(void* buffer, size_t len) si.seq = seq; si.idOut = welcome_id; + stored_assert(si.connected()); break; } case Update: { @@ -1243,7 +1256,7 @@ void SyncConnection::decode(void* buffer, size_t len) break; StoreInfo const& si = m_store[it->second]; - if(si.source && si.idOut) + if(si.source && si.connected()) // Hey, we need it! helloAgain(*(it->second)); else @@ -1259,7 +1272,7 @@ void SyncConnection::decode(void* buffer, size_t len) break; StoreInfo const& si = it->second; - if(si.source && si.idOut) + if(si.source && si.connected()) // Hey, we need it! helloAgain(*j); else @@ -1365,6 +1378,7 @@ void SyncConnection::helloAgain(StoreJournal& store) } stored_assert(id); + stored_assert(!si.connected()); encodeCmd(Hello); store.encodeHash(*this, false); @@ -1497,6 +1511,9 @@ StoreJournal::Seq Synchronizer::process(ProtocolLayer& connection, StoreJournal& return 0; } +/*! + * \brief Returns if the given store is registered for synchronization over any connection. + */ bool Synchronizer::isSynchronizing(StoreJournal& j) const { for(Connections::iterator it = m_connections.begin(); it != m_connections.end(); ++it) @@ -1506,6 +1523,10 @@ bool Synchronizer::isSynchronizing(StoreJournal& j) const return false; } +/*! + * \brief Returns if the given store is registered for synchronization over any connection, + * except the given one. + */ bool Synchronizer::isSynchronizing(StoreJournal& j, SyncConnection& notOverConnection) const { for(Connections::iterator it = m_connections.begin(); it != m_connections.end(); ++it) { @@ -1517,4 +1538,13 @@ bool Synchronizer::isSynchronizing(StoreJournal& j, SyncConnection& notOverConne return false; } +/*! + * \brief Returns if the given store is currently connected over the given connection. + */ +bool Synchronizer::isConnected(StoreJournal& j, SyncConnection& connection) const +{ + SyncConnection const* c = toConnection(connection); + return c && c->isConnected(j); +} + } // namespace stored From 1f7f66ae18b9e81fce876082ac26a4d04765cf74 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 28 Nov 2025 15:18:12 +0100 Subject: [PATCH 10/14] cleanup exceptions --- examples/lossy_sync/main.cpp | 36 +++++++++++---------- include/libstored/allocator.h | 60 +++++++++++++++-------------------- include/libstored/fifo.h | 10 +++--- include/libstored/macros.h | 42 +++++++++++++----------- include/libstored/poller.h | 9 ++---- include/libstored/spm.h | 14 ++------ src/compress.cpp | 28 ++++++---------- src/protocol.cpp | 10 ++---- 8 files changed, 88 insertions(+), 121 deletions(-) diff --git a/examples/lossy_sync/main.cpp b/examples/lossy_sync/main.cpp index 99bf388a..6d7e7ac1 100644 --- a/examples/lossy_sync/main.cpp +++ b/examples/lossy_sync/main.cpp @@ -140,10 +140,10 @@ static Arguments parse_arguments(int argc, char** argv) try { int port = std::stoi(optarg); if(port <= 0 || port >= 0x10000) - throw std::invalid_argument{"Invalid port"}; + STORED_throw(std::invalid_argument{"Invalid port"}); args.debug_port = port; } catch(std::exception& e) { - throw std::invalid_argument{e.what()}; + STORED_throw(std::invalid_argument{e.what()}); } break; case 'v': @@ -160,30 +160,30 @@ static Arguments parse_arguments(int argc, char** argv) try { args.ber = std::stof(optarg); if(args.ber < 0.F || args.ber > 1.F) - throw std::invalid_argument{"Invalid BER"}; + STORED_throw(std::invalid_argument{"Invalid BER"}); } catch(std::invalid_argument&) { - throw; + STORED_rethrow; } catch(std::exception& e) { - throw std::invalid_argument{e.what()}; + STORED_throw(std::invalid_argument{e.what()}); } break; case 'h': print_help(stdout, argv[0]); - throw exit_now(); + STORED_throw(exit_now()); default: print_help(stderr, argv[0]); - throw std::invalid_argument{""}; + STORED_throw(std::invalid_argument{""}); } } if(!args.client.empty() && !args.server.empty()) { log("Cannot be both client and server\n"); - throw std::invalid_argument{""}; + STORED_throw(std::invalid_argument{""}); } if(args.client.empty() && args.server.empty()) { log("Must be either client or server\n"); - throw std::invalid_argument{""}; + STORED_throw(std::invalid_argument{""}); } return args; @@ -209,7 +209,7 @@ class DebugStack { if((errno = m_debugLayer.lastError())) { log("Cannot initialize ZMQ for debugging, got error %d; %s\n", errno, zmq_strerror(errno)); - throw std::runtime_error{"ZMQ initialization failed"}; + STORED_throw(std::runtime_error{"ZMQ initialization failed"}); } m_id = "lossy_sync"; @@ -266,14 +266,14 @@ class SyncStack { if((errno = m_zmqLayer.lastError())) { log("Cannot initialize ZMQ for sync, got error %d; %s\n", errno, zmq_strerror(errno)); - throw std::runtime_error{"ZMQ initialization failed"}; + STORED_throw(std::runtime_error{"ZMQ initialization failed"}); } int linger = 0; if(zmq_setsockopt(m_zmqLayer.socket(), ZMQ_LINGER, &linger, sizeof(linger)) == -1) { log("Cannot set ZMQ_LINGER, got error %d; %s\n", errno, zmq_strerror(errno)); - throw std::runtime_error{"ZMQ setsockopt failed"}; + STORED_throw(std::runtime_error{"ZMQ setsockopt failed"}); } if(verbose) @@ -421,7 +421,7 @@ class SyncStack { auto dt = now - m_idleUpSince; if(dt > std::chrono::milliseconds(DisconnectTimeout_ms)) { log("No upstream activity, disconnecting\n"); - throw disconnected{}; + STORED_throw(disconnected{}); } } else { m_idleUpSince = now; @@ -447,7 +447,7 @@ class SyncStack { switch(event) { case stored::ArqLayer::EventEncodeBufferOverflow: log("ARQ encode buffer overflow\n"); - throw disconnected{}; + STORED_throw(disconnected{}); case stored::ArqLayer::EventReconnect: log("ARQ reconnect event\n"); break; @@ -494,12 +494,12 @@ static void run(Arguments const& args, ExampleSync& store, DebugStack& debugStac stored::Poller poller; if((errno = poller.add(debugStack.pollable()))) { perror("Cannot add pollable"); - throw std::runtime_error{"Poller add failed"}; + STORED_throw(std::runtime_error{"Poller add failed"}); } if((errno = poller.add(syncStack->pollable()))) { perror("Cannot add pollable"); - throw std::runtime_error{"Poller add failed"}; + STORED_throw(std::runtime_error{"Poller add failed"}); } try { @@ -523,7 +523,7 @@ static void run(Arguments const& args, ExampleSync& store, DebugStack& debugStac now = std::chrono::steady_clock::now(); } - throw; + STORED_rethrow; } } @@ -556,7 +556,9 @@ int main(int argc, char** argv) } catch(std::invalid_argument&) { return 1; } catch(std::exception& e) { +#ifdef STORED_cpp_exceptions log("Error: %s\n", e.what()); +#endif return 2; } catch(...) { log("Unknown error\n"); diff --git a/include/libstored/allocator.h b/include/libstored/allocator.h index 9b498e4c..6ef4319c 100644 --- a/include/libstored/allocator.h +++ b/include/libstored/allocator.h @@ -1,6 +1,6 @@ #ifndef LIBSTORED_ALLOCATOR_H #define LIBSTORED_ALLOCATOR_H -// SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -67,26 +67,26 @@ static inline void deallocate(T* p, size_t n = 1) noexcept * \param T the type of the class for which the operators are to be defined */ // cppcheck-suppress-macro duplInheritedMember -# define STORED_CLASS_NEW_DELETE(T) \ - public: \ - void* operator new(std::size_t n) \ - { \ - STORED_UNUSED(n) \ - stored_assert(n == sizeof(T)); \ - return ::stored::allocate(); \ - } \ - void* operator new(std::size_t n, void* ptr) /* NOLINT */ \ - { \ - STORED_UNUSED(n) \ - stored_assert(n == sizeof(T)); \ - return ptr; \ - } \ - void operator delete(void* ptr) \ - { \ - ::stored::deallocate(static_cast(ptr)); /* NOLINT */ \ - } \ - \ - private: +# define STORED_CLASS_NEW_DELETE(T) \ + public: \ + void* operator new(std::size_t n) \ + { \ + STORED_UNUSED(n) \ + stored_assert(n == sizeof(T)); \ + return ::stored::allocate(); \ + } \ + void* operator new(std::size_t n, void* ptr) /* NOLINT */ \ + { \ + STORED_UNUSED(n) \ + stored_assert(n == sizeof(T)); \ + return ptr; \ + } \ + void operator delete(void* ptr) \ + { \ + ::stored::deallocate(static_cast(ptr)); /* NOLINT */ \ + } \ + \ + private: /*! * \brief Wrapper for Config::Allocator::type::deallocate() after destroying the given object. @@ -188,11 +188,7 @@ class Callable { R operator()(typename CallableArgType::type... /*args*/) const final { -# ifdef STORED_cpp_exceptions - throw std::bad_function_call(); -# else - std::terminate(); -# endif + STORED_throw(std::bad_function_call()); } // NOLINTNEXTLINE(hicpp-explicit-conversions) @@ -339,9 +335,7 @@ class Callable { m_w = w; } catch(...) { cleanup(w); -# ifdef STORED_cpp_exceptions - throw; -# endif + STORED_rethrow; } } return *this; @@ -436,9 +430,7 @@ class Callable { c.get().clone(m_buffer.data()); } catch(...) { construct(); -# ifdef STORED_cpp_exceptions - throw; -# endif + STORED_rethrow; } } return *this; @@ -515,9 +507,7 @@ class Callable { } catch(...) { // Make sure we always have a valid instance in the buffer. construct(); -# ifdef STORED_cpp_exceptions - throw; -# endif + STORED_rethrow; } } diff --git a/include/libstored/fifo.h b/include/libstored/fifo.h index 83946454..e51e7fe5 100644 --- a/include/libstored/fifo.h +++ b/include/libstored/fifo.h @@ -16,6 +16,7 @@ # include # include # include +# include # include # include # include @@ -241,7 +242,8 @@ class BufferView { BufferView subview(size_t offset, size_t len) const { - return BufferView{*m_b, absolute((pointer)offset), absolute((pointer)(offset + len))}; + return BufferView{ + *m_b, absolute((pointer)offset), absolute((pointer)(offset + len))}; } BufferView subview(size_t offset) const @@ -985,11 +987,7 @@ class MessageFifo { if(unlikely(Capacity > 0 && message.size() + partial > Capacity)) { // Will never fit. -# ifdef STORED_cpp_exceptions - throw std::bad_alloc(); -# else - std::terminate(); -# endif + STORED_throw(std::bad_alloc()); } if(wp == rp && partial == 0) { diff --git a/include/libstored/macros.h b/include/libstored/macros.h index 3a801500..24603773 100644 --- a/include/libstored/macros.h +++ b/include/libstored/macros.h @@ -264,19 +264,6 @@ typedef SSIZE_T ssize_t; #if defined(STORED_cplusplus) // All C++ -# if !defined(STORED_cpp_exceptions) && defined(__cpp_exceptions) -# define STORED_cpp_exceptions __cpp_exceptions -# endif -# if !defined(STORED_cpp_exceptions) -# define try if(true) -# define catch(...) if(false) -# define throw std::terminate(), (void) -# endif - -# if !defined(STORED_cpp_rtti) && defined(__cpp_rtti) -# define STORED_cpp_rtti __cpp_rtti -# endif - # if STORED_cplusplus < 201103L // < C++11 # ifndef STORED_COMPILER_MSVC # ifndef constexpr @@ -292,11 +279,7 @@ typedef SSIZE_T ssize_t; # define nullptr NULL # endif # ifndef noexcept -# ifdef STORED_cpp_exceptions -# define noexcept throw() -# else -# define noexcept -# endif +# define noexcept throw() # endif # endif # ifndef is_default @@ -342,7 +325,28 @@ typedef SSIZE_T ssize_t; # endif # endif # endif -#endif + +# if !defined(STORED_cpp_exceptions) && defined(__cpp_exceptions) +# define STORED_cpp_exceptions __cpp_exceptions +# endif +# if !defined(STORED_cpp_exceptions) +# define try if_constexpr(true) +# define catch(...) if_constexpr(false) +# define STORED_throw(e) \ + do { /* NOLINT(cppcoreguidelines-avoid-do-while) */ \ + (void)fprintf(stderr, "Exception: %s\n", #e); \ + std::terminate(); \ + } while(0) +# define STORED_rethrow std::terminate() +# else +# define STORED_throw(e) throw e +# define STORED_rethrow throw +# endif + +# if !defined(STORED_cpp_rtti) && defined(__cpp_rtti) +# define STORED_cpp_rtti __cpp_rtti +# endif +#endif // STORED_cplusplus #ifndef STORED_thread_local # if defined(STORED_OS_BAREMETAL) || defined(STORED_OS_GENERIC) diff --git a/include/libstored/poller.h b/include/libstored/poller.h index b4e7bb48..083944a1 100644 --- a/include/libstored/poller.h +++ b/include/libstored/poller.h @@ -795,15 +795,10 @@ class InheritablePoller : public PollerImpl { switch(errno) { case 0: break; -# ifdef STORED_cpp_exceptions case ENOMEM: - throw std::bad_alloc(); + STORED_throw(std::bad_alloc()); default: - throw std::runtime_error(""); -# else - default: - std::terminate(); -# endif + STORED_throw(std::runtime_error("")); } } diff --git a/include/libstored/spm.h b/include/libstored/spm.h index 34312bd0..8422ad29 100644 --- a/include/libstored/spm.h +++ b/include/libstored/spm.h @@ -1,6 +1,6 @@ #ifndef LIBSTORED_SPM_H #define LIBSTORED_SPM_H -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -312,12 +312,8 @@ class ScratchPad { m_old.push_back(m_buffer); } catch(...) { deallocate(p, size + chunkHeader); -# ifdef STORED_cpp_exceptions // cppcheck-suppress rethrowNoCurrentException - throw; -# else - std::terminate(); -# endif + STORED_rethrow; } } @@ -530,11 +526,7 @@ class ScratchPad { if(unlikely(m_total + alloc_size + padding < m_total)) { // Wrap around -> overflow. -# ifdef STORED_cpp_exceptions - throw std::bad_alloc(); -# else - std::terminate(); -# endif + STORED_throw(std::bad_alloc()); } size_t bs = bufferSize(); diff --git a/src/compress.cpp b/src/compress.cpp index 162648db..e24e8dbb 100644 --- a/src/compress.cpp +++ b/src/compress.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 @@ -7,8 +7,8 @@ #ifdef STORED_HAVE_HEATSHRINK extern "C" { -# include -# include +# include +# include } /*! @@ -19,7 +19,7 @@ static heatshrink_encoder& encoder_(void* e) stored_assert(e); return *static_cast(e); } -# define encoder() (encoder_(m_encoder)) // NOLINT(cppcoreguidelines-macro-usage) +# define encoder() (encoder_(m_encoder)) // NOLINT(cppcoreguidelines-macro-usage) /*! * \brief Helper to get a \c heatshrink_decoder reference from \c CompressLayer::m_decoder. @@ -29,7 +29,7 @@ static heatshrink_decoder& decoder_(void* d) stored_assert(d); return *static_cast(d); } -# define decoder() (decoder_(m_decoder)) // NOLINT(cppcoreguidelines-macro-usage) +# define decoder() (decoder_(m_decoder)) // NOLINT(cppcoreguidelines-macro-usage) namespace stored { @@ -65,13 +65,8 @@ void CompressLayer::decode(void* buffer, size_t len) if(unlikely(!m_decoder)) { m_decoder = heatshrink_decoder_alloc(DecodeInputBuffer, Window, Lookahead); - if(!m_decoder) { -# ifdef STORED_cpp_exceptions - throw std::bad_alloc(); -# else - std::terminate(); -# endif - } + if(!m_decoder) + STORED_throw(std::bad_alloc()); } m_state |= (uint8_t)FlagDecoding; @@ -133,13 +128,8 @@ void CompressLayer::encode(void const* buffer, size_t len, bool last) if(unlikely(!m_encoder)) { m_encoder = heatshrink_encoder_alloc(Window, Lookahead); - if(!m_encoder) { -# ifdef STORED_cpp_exceptions - throw std::bad_alloc(); -# else - std::terminate(); -# endif - } + if(!m_encoder) + STORED_throw(std::bad_alloc()); } m_state |= (uint8_t)FlagEncoding; diff --git a/src/protocol.cpp b/src/protocol.cpp index 1f82cd37..b4d89ec8 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -764,7 +764,7 @@ void ArqLayer::event(ArqLayer::Event e) break; case EventEncodeBufferOverflow: // Cannot handle this. - throw std::bad_alloc(); + STORED_throw(std::bad_alloc()); } } } @@ -1995,7 +1995,7 @@ void impl::Loopback1::reserve(size_t capacity) // NOLINTNEXTLINE(cppcoreguidelines-owning-memory, cppcoreguidelines-no-malloc) void* p = realloc(m_buffer, capacity); if(unlikely(!p)) - throw std::bad_alloc(); + STORED_throw(std::bad_alloc()); m_buffer = static_cast(p); m_capacity = capacity; @@ -3368,11 +3368,7 @@ NamedPipeLayer::NamedPipeLayer( // Oops, revert the handler and accept SIGPIPEs. if(signal(SIGPIPE, oldh) == SIG_ERR) { // Really oops... Now we broke something. -# ifdef STORED_cpp_exceptions - throw std::runtime_error("Cannot restore SIGPIPE handler"); -# else - std::terminate(); -# endif + STORED_throw(std::runtime_error("Cannot restore SIGPIPE handler")); } } From 0f577b36677c366f29fe9430cb10bc4f34ae6546 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 28 Nov 2025 15:41:06 +0100 Subject: [PATCH 11/14] doc --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0d8b22d3..f6bbac0c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -25,6 +25,7 @@ Added ````` - Operations such as ``-=`` and ``++`` for store variables in C++. +- YAML export of store meta-data. Changed ``````` From 06f48e74ef076e6614dd54d6c4359d222adaab80 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 28 Nov 2025 16:19:52 +0100 Subject: [PATCH 12/14] compile fixes --- examples/CMakeLists.txt | 11 +---- examples/lib/include/getopt_mini.h | 2 +- examples/lib/src/getopt_mini.cpp | 4 +- examples/lossy_sync/main.cpp | 13 +++-- include/libstored/debugger.h | 76 +++++++++++++++--------------- include/libstored/fifo.h | 8 ++-- include/libstored/types.h | 58 +++++++++++------------ sphinx/doc/cpp_poller.rst | 2 +- 8 files changed, 85 insertions(+), 89 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 66be2e13..d9ec1b9d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -37,16 +37,7 @@ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT APPLE) endif() if(NOT CMAKE_CXX_STANDARD) - foreach( - cxx - cxx_std_26 - cxx_std_23 - cxx_std_20 - cxx_std_17 - cxx_std_14 - cxx_std_11 - cxx_std_98 - ) + foreach(cxx cxx_std_17 cxx_std_14 cxx_std_11 cxx_std_98) list(FIND CMAKE_CXX_COMPILE_FEATURES "${cxx}" _cxx) if(${_cxx} GREATER -1) string(REPLACE "cxx_std_" "" _std "${cxx}") diff --git a/examples/lib/include/getopt_mini.h b/examples/lib/include/getopt_mini.h index 3e0c3f4e..3f66c97b 100644 --- a/examples/lib/include/getopt_mini.h +++ b/examples/lib/include/getopt_mini.h @@ -4,7 +4,7 @@ // // SPDX-License-Identifier: MIT -#if defined(__linux__) || defined(__APPLE__) +#if defined(__linux__) || defined(__APPLE__) || defined(__MINGW32__) || defined(__MINGW64__) // Just use glibc's one. # include #else // !POSIX diff --git a/examples/lib/src/getopt_mini.cpp b/examples/lib/src/getopt_mini.cpp index 3afc64ca..6f346de6 100644 --- a/examples/lib/src/getopt_mini.cpp +++ b/examples/lib/src/getopt_mini.cpp @@ -4,14 +4,14 @@ #include -#if !defined(__linux__) && !defined(__APPLE__) +#if !defined(__linux__) && !defined(__APPLE__) && !defined(__MINGW32__) && !defined(__MINGW64__) # include int opterr = 1; int optopt = 0; int optind = 1; -char* optarg = nullptr; +char* optarg = NULL; // flawfinder: ignore int getopt(int argc, char* const* argv, char const* options) diff --git a/examples/lossy_sync/main.cpp b/examples/lossy_sync/main.cpp index 6d7e7ac1..4062bdbc 100644 --- a/examples/lossy_sync/main.cpp +++ b/examples/lossy_sync/main.cpp @@ -10,6 +10,7 @@ #include "ExampleSync.h" +#include #include #include #include @@ -36,16 +37,19 @@ enum { static std::function logger_callback; +// flawfinder: ignore __attribute__((format(printf, 1, 0))) static void logv(char const* format, va_list args) { - static char msg[1024]; - vsnprintf(msg, sizeof(msg), format, args); - fputs(msg, stderr); + static std::array msg; + // flawfinder: ignore + vsnprintf(msg.data(), msg.size(), format, args); + fputs(msg.data(), stderr); if(logger_callback) - logger_callback(msg); + logger_callback(msg.data()); } +// flawfinder: ignore __attribute__((format(printf, 1, 2))) static void log(char const* format, ...) { va_list args; @@ -534,6 +538,7 @@ int main(int argc, char** argv) #else setvbuf(stdout, nullptr, _IOLBF, 0); #endif + // flawfinder: ignore srand((unsigned int)time(nullptr)); try { diff --git a/include/libstored/debugger.h b/include/libstored/debugger.h index c126fae2..c6a20200 100644 --- a/include/libstored/debugger.h +++ b/include/libstored/debugger.h @@ -1,27 +1,27 @@ #ifndef LIBSTORED_DEBUGGER_H #define LIBSTORED_DEBUGGER_H -// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers +// SPDX-FileCopyrightText: 2020-2025 Jochem Rutgers // // SPDX-License-Identifier: MPL-2.0 #ifdef __cplusplus -# include -# include -# include -# include -# include -# include +# include +# include +# include +# include +# include +# include -# include -# include -# include -# include -# include +# include +# include +# include +# include +# include -# if STORED_cplusplus >= 201103L -# include -# endif +# if STORED_cplusplus >= 201103L +# include +# endif namespace stored { @@ -58,7 +58,7 @@ class Stream final : public ProtocolLayer { if(blocked()) return; -# ifdef STORED_HAVE_ZTH +# ifdef STORED_HAVE_ZTH // With Zth, the encode context may be different from // Debugger's CmdTrace context. As we pass a buffer pointer to // encode within CmdTrace, this buffer should not be changed @@ -68,7 +68,7 @@ class Stream final : public ProtocolLayer { stored_assert( !Config::AvoidDynamicMemory || m_buffer.size() + len <= m_buffer.capacity()); -# endif +# endif m_buffer.append(static_cast(buffer), len); } @@ -210,9 +210,9 @@ class Stream final : public ProtocolLayer { bool empty() const noexcept { return -# ifdef STORED_HAVE_HEATSHRINK +# ifdef STORED_HAVE_HEATSHRINK m_compress.idle() && -# endif +# endif m_string.empty(); } @@ -259,10 +259,10 @@ class Stream final : public ProtocolLayer { Stream m_string; }; -# ifdef STORED_COMPILER_ARMCC -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wnon-virtual-dtor" -# endif +# ifdef STORED_COMPILER_ARMCC +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wnon-virtual-dtor" +# endif /*! * \brief Container-template-type-invariant base class of a wrapper for #stored::Variant. */ @@ -365,11 +365,11 @@ class DebugVariantTyped : public DebugVariantBase { explicit DebugVariantTyped(Variant const& variant) : m_variant(variant) { -# if STORED_cplusplus >= 201103L +# if STORED_cplusplus >= 201103L static_assert(std::is_trivially_destructible>::value, ""); -# elif defined(__GCC__) +# elif defined(__GCC__) static_assert(__has_trivial_destructor(Variant), ""); -# endif +# endif } /*! @@ -474,13 +474,13 @@ class DebugVariant final : public DebugVariantBase { sizeof(DebugVariantTyped) == sizeof(DebugVariantTyped<>), ""); // Check if our default copy constructor works properly. -# if STORED_cplusplus >= 201103L +# if STORED_cplusplus >= 201103L static_assert(std::is_trivially_copyable>::value, ""); static_assert(std::is_trivially_destructible>::value, ""); -# elif defined(__GCC__) +# elif defined(__GCC__) static_assert(__has_trivial_copy(Variant), ""); static_assert(__has_trivial_destructor(Variant), ""); -# endif +# endif new(m_buffer) DebugVariantTyped(variant); // Check if the cast of variant() works properly. @@ -568,9 +568,9 @@ class DebugVariant final : public DebugVariantBase { // flawfinder: ignore char m_buffer[sizeof(DebugVariantTyped<>)]; }; -# ifdef STORED_COMPILER_ARMCC -# pragma clang diagnostic pop -# endif +# ifdef STORED_COMPILER_ARMCC +# pragma clang diagnostic pop +# endif /*! * \brief Wrapper for a store, that hides the store's template parameters. @@ -614,14 +614,14 @@ class DebugStoreBase { * #stored::DebugVariant of the object, and the \c arg value, as passed * to \c list(). * - * \see #stored::DebugStoreBase::list(ListCallbackArg*, void*, char const*) const + * \see stored::DebugStoreBase::list(ListCallbackArg*, void*, char const*) const */ -# ifdef DOXYGEN +# ifdef DOXYGEN // breathe breaks on the function typedef. using ListCallbackArg = void(char const*, DebugVariant&, void*); -# else +# else typedef void(ListCallbackArg)(char const*, DebugVariant&, void*); -# endif +# endif /*! * \brief Iterates over the directory and invoke a callback for every object. @@ -798,7 +798,7 @@ class Debugger : public ProtocolLayer { typedef DebugStoreBase::ListCallbackArg ListCallbackArg; void list(ListCallbackArg* f, void* arg = nullptr) const; -# if STORED_cplusplus >= 201103L +# if STORED_cplusplus >= 201103L /*! * \brief Callback function prototype as supplied to \c list(). * @@ -824,7 +824,7 @@ class Debugger : public ProtocolLayer { }; list(static_cast(cb), &f); } -# endif +# endif private: static void listCmdCallback(char const* name, DebugVariant& variant, void* arg); diff --git a/include/libstored/fifo.h b/include/libstored/fifo.h index e51e7fe5..f6ea9076 100644 --- a/include/libstored/fifo.h +++ b/include/libstored/fifo.h @@ -665,12 +665,12 @@ class Fifo { } typedef PopIterator iterator; - constexpr iterator begin() noexcept + iterator begin() noexcept { return PopIterator(*this); } - constexpr iterator end() noexcept + iterator end() noexcept { return PopIterator(); } @@ -1066,12 +1066,12 @@ class MessageFifo { } typedef PopIterator iterator; - constexpr iterator begin() noexcept + iterator begin() noexcept { return PopIterator(*this); } - constexpr iterator end() noexcept + iterator end() noexcept { return PopIterator(); } diff --git a/include/libstored/types.h b/include/libstored/types.h index 47e789d6..c91071d7 100644 --- a/include/libstored/types.h +++ b/include/libstored/types.h @@ -374,11 +374,11 @@ struct fromType { return *this; \ } -# define STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, op) \ +# define STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, op, opop) \ template < \ typename U = T, \ typename std::enable_if::type), int>::type = 0> \ - Class& operator op##op() noexcept \ + Class& operator opop() noexcept \ { \ return *this op## = 1; \ } \ @@ -386,7 +386,7 @@ struct fromType { template < \ typename U = T, \ typename std::enable_if::type), int>::type = 0> \ - U const operator op##op(int) noexcept \ + U const operator opop(int) noexcept \ { \ U x = get(); \ set(x op 1); \ @@ -409,35 +409,35 @@ struct fromType { return *this; \ } -# define STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, op) \ - Class& operator op##op() noexcept \ - { \ - stored_assert(Type::isInt(toType::type)); \ - return *this op## = 1; \ - } \ - \ - T const operator op##op(int) noexcept \ - { \ - stored_assert(Type::isInt(toType::type)); \ - T x = get(); \ - set(x op 1); \ - return x; \ +# define STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, op, opop) \ + Class& operator opop() noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + return *this op## = 1; \ + } \ + \ + T const operator opop(int) noexcept \ + { \ + stored_assert(Type::isInt(toType::type)); \ + T x = get(); \ + set(x op 1); \ + return x; \ } # endif // < C++11 -# define STORED_VARIABLE_MEMBER_ARITH_OPS(Class, T) \ - STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, +) \ - STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, -) \ - STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, *) \ - STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, /) \ - STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, %) \ - STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, &) \ - STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, |) \ - STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, ^) \ - STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, <<) \ - STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, >>) \ - STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, +) \ - STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, -) +# define STORED_VARIABLE_MEMBER_ARITH_OPS(Class, T) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, +) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, -) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, *) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, /) \ + STORED_VARIABLE_MEMBER_ARITH_OP(Class, T, %) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, &) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, |) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, ^) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, <<) \ + STORED_VARIABLE_MEMBER_ARITH_BITOP(Class, T, >>) \ + STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, +, ++) \ + STORED_VARIABLE_MEMBER_ARITH_OPOP(Class, T, -, --) template class Variant; diff --git a/sphinx/doc/cpp_poller.rst b/sphinx/doc/cpp_poller.rst index 7973c216..4c5f9103 100644 --- a/sphinx/doc/cpp_poller.rst +++ b/sphinx/doc/cpp_poller.rst @@ -90,7 +90,7 @@ stored::PollableZmqLayer .. doxygenclass:: stored::PollableZmqLayer -.. doxygenfunction:: stored::pollable(ZmqLayer &l, Pollable::Events const &events, void *user = nullptr) +.. doxygenfunction:: stored::pollable(ZmqBaseLayer &l, Pollable::Events const &events, void *user = nullptr) .. dummy* stored::PollableZmqSocket From c24b4482893e4511fe7a9416ce7d1c5db7f0f37f Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 28 Nov 2025 17:11:34 +0100 Subject: [PATCH 13/14] match test with new arq --- tests/test_protocol.cpp | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/tests/test_protocol.cpp b/tests/test_protocol.cpp index 883e8621..9f1b6aa8 100644 --- a/tests/test_protocol.cpp +++ b/tests/test_protocol.cpp @@ -12,11 +12,11 @@ #include #include -#define DECODE(stack, str) \ - do { \ - char msg_[] = "" str; \ - (stack).decode(msg_, sizeof(msg_) - 1); \ - } while(0) +#define DECODE(stack, str) \ + do { \ + char msg_[] = "" str; \ + (stack).decode(msg_, sizeof(msg_) - 1); \ + } while(0) namespace { @@ -912,7 +912,9 @@ TEST(ArqLayer, Retransmit) top.encode(" 1", 2); EXPECT_EQ(bottom.encoded().at(0), "\x01 1"); - top.encode(" 2", 2); // triggers retransmit of 1 + top.encode(" 2", 2); // Does not retransmit. + EXPECT_EQ(bottom.encoded().size(), 1); + l.keepAlive(); // triggers retransmit of 1 EXPECT_EQ(bottom.encoded().at(1), "\x01 1"); top.flush(); @@ -1023,6 +1025,7 @@ TEST(ArqLayer, Reconnect) EXPECT_EQ(bottom.encoded().at(2), "\x80\x40"); top.encode(" 3", 2); + l.keepAlive(); EXPECT_EQ(bottom.encoded().at(3), "\x40"); // retransmit DECODE(bottom, "\x40"); @@ -1030,25 +1033,27 @@ TEST(ArqLayer, Reconnect) // Separate ack/reset does not fully reconnect; expect reset. DECODE(bottom, "\xc0"); - EXPECT_EQ(bottom.encoded().at(5), "\x01 3"); + l.keepAlive(); // nop, no retransmit after reset + EXPECT_EQ(bottom.encoded().at(5), "\x41"); DECODE(bottom, "\x40"); EXPECT_EQ(bottom.encoded().at(6), "\x80\x40"); // full reset again // In same message, reconnection completes. DECODE(bottom, "\xc0\x40"); - EXPECT_EQ(bottom.encoded().at(7), "\x80\x01 3"); + EXPECT_EQ(bottom.encoded().at(7), "\x80"); DECODE(bottom, "\x81"); top.encode(" 4", 2); - EXPECT_EQ(bottom.encoded().at(8), "\x02 4"); - DECODE(bottom, "\x82"); + EXPECT_EQ(bottom.encoded().at(8), "\x01 4"); + DECODE(bottom, "\x81"); top.encode(" 5", 2); - EXPECT_EQ(bottom.encoded().at(9), "\x03 5"); + EXPECT_EQ(bottom.encoded().at(9), "\x02 5"); DECODE(bottom, "\x40"); EXPECT_EQ(bottom.encoded().at(10), "\x80\x40"); DECODE(bottom, "\x80"); - EXPECT_EQ(bottom.encoded().at(11), "\x01 5"); + l.keepAlive(); + EXPECT_EQ(bottom.encoded().at(11), "\x41"); } TEST(ArqLayer, Reconnect2) @@ -1061,6 +1066,9 @@ TEST(ArqLayer, Reconnect2) LoggingLayer lb; lb.stack(b); + // Complete handshake. + a.keepAlive(); + la.encode(" 1", 2); EXPECT_EQ(lb.decoded().at(0), " 1"); From 90f9b0d3dbb5f1ec14def23a770d7874af5b5df7 Mon Sep 17 00:00:00 2001 From: Jochem Rutgers <68805714+jhrutgers@users.noreply.github.com> Date: Fri, 28 Nov 2025 17:11:41 +0100 Subject: [PATCH 14/14] compile fix --- .vscode/.gitignore | 4 ---- include/libstored/types.h | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/.vscode/.gitignore b/.vscode/.gitignore index b5dac741..450a2340 100644 --- a/.vscode/.gitignore +++ b/.vscode/.gitignore @@ -1,5 +1 @@ -# SPDX-FileCopyrightText: 2020-2024 Jochem Rutgers -# -# SPDX-License-Identifier: CC0-1.0 - launch.json diff --git a/include/libstored/types.h b/include/libstored/types.h index c91071d7..10bb9a12 100644 --- a/include/libstored/types.h +++ b/include/libstored/types.h @@ -386,7 +386,7 @@ struct fromType { template < \ typename U = T, \ typename std::enable_if::type), int>::type = 0> \ - U const operator opop(int) noexcept \ + U operator opop(int) noexcept /* NOLINT(cert-dcl21-cpp) */ \ { \ U x = get(); \ set(x op 1); \ @@ -416,7 +416,7 @@ struct fromType { return *this op## = 1; \ } \ \ - T const operator opop(int) noexcept \ + T operator opop(int) noexcept /* NOLINT(cert-dcl21-cpp) */ \ { \ stored_assert(Type::isInt(toType::type)); \ T x = get(); \