From 810da825bb391b951d8e44018975724f7157611c Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 26 Sep 2025 20:42:40 +0200 Subject: [PATCH 01/43] Initial impl --- include/boost/redis/connection.hpp | 51 +++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index b283af27..419d4949 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -28,12 +28,17 @@ #include #include +#include #include +#include #include #include #include +#include +#include #include #include +#include #include #include #include @@ -87,6 +92,7 @@ struct connection_impl { connection_logger logger_; read_buffer read_buffer_; generic_response setup_resp_; + asio::cancellation_signal run_signal_; using executor_type = Executor; @@ -386,6 +392,7 @@ class run_op { system::error_code ec3, system::error_code) { + // TODO: check system::error_code final_ec; if (order[0] == 0 && !!ec0) { @@ -402,7 +409,6 @@ class run_op { (*this)(self, final_ec); } - // TODO: this op doesn't handle per-operation cancellation correctly template void operator()(Self& self, system::error_code ec = {}) { @@ -426,6 +432,12 @@ class run_op { BOOST_ASIO_CORO_YIELD conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self)); + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + // If we were successful, run all the connection tasks if (!ec) { conn_->read_buffer_.clear(); @@ -468,6 +480,12 @@ class run_op { conn_->cancel(operation::receive); } + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + // If we are not going to try again, we're done if (!conn_->will_reconnect()) { self.complete(ec); @@ -479,9 +497,9 @@ class run_op { BOOST_ASIO_CORO_YIELD conn_->reconnect_timer_.async_wait(std::move(self)); - // If the timer was cancelled, exit - if (ec) { - self.complete(ec); + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); return; } @@ -497,6 +515,17 @@ class run_op { logger make_stderr_logger(logger::level lvl, std::string prefix); +class run_cancel_handler { + asio::cancellation_signal* sig_; + +public: + explicit run_cancel_handler(asio::cancellation_signal& sig) noexcept + : sig_(&sig) + { } + + void operator()(asio::cancellation_type_t cancel_type) const { sig_->emit(cancel_type); } +}; + } // namespace detail /** @brief A SSL connection to the Redis server. @@ -644,9 +673,21 @@ class basic_connection { impl_->health_checker_.set_config(cfg); impl_->read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size}); + // If the token's slot has cancellation enabled, it should just emit + // the cancellation signal in our connection. This lets us unify the cancel() + // function and per-operation cancellation + auto slot = asio::get_associated_cancellation_slot(token); + if (slot.is_connected()) { + slot.template emplace(impl_->run_signal_); + } + + // Overwrite the token's cancellation slot: the composed operation + // should use the signal's slot so we can generate cancellations in cancel() return asio::async_compose( detail::run_op{impl_.get()}, - token, + asio::bind_cancellation_slot( + impl_->run_signal_.slot(), + std::forward(token)), impl_->writer_timer_); } From 20e8f760ffbcb54446f467a7e80bbf98e181b9df Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 27 Sep 2025 18:56:42 +0200 Subject: [PATCH 02/43] Remove connection cancellations from reader fsm --- include/boost/redis/detail/reader_fsm.hpp | 2 -- include/boost/redis/impl/reader_fsm.ipp | 27 ++++++++++++----------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 6892ebc0..68b3bcd8 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -26,7 +26,6 @@ class reader_fsm { append_some, needs_more, notify_push_receiver, - cancel_run, done, }; @@ -45,7 +44,6 @@ class reader_fsm { private: int resume_point_{0}; read_buffer* read_buffer_ = nullptr; - action action_after_resume_; action::type next_read_type_ = action::type::append_some; multiplexer* mpx_ = nullptr; std::pair res_{consume_result::needs_more, 0u}; diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 272a5859..90d662f3 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -16,43 +16,46 @@ reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept , mpx_{&mpx} { } +// TODO: write cancellation tests reader_fsm::action reader_fsm::resume( std::size_t bytes_read, system::error_code ec, - asio::cancellation_type_t /*cancel_state*/) + asio::cancellation_type_t cancel_state) { switch (resume_point_) { BOOST_REDIS_CORO_INITIAL BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation) for (;;) { + // Prepare the buffer for the read operation ec = read_buffer_->prepare_append(); if (ec) { - action_after_resume_ = {action::type::done, 0, ec}; - BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, 0, ec}; } + // Read BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_) + + // Process the bytes read, even if there was an error read_buffer_->commit_append(bytes_read); + + // Check for read errors if (ec) { // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and // deliver it to the user before calling cancel_run. - action_after_resume_ = {action::type::done, bytes_read, ec}; - BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, bytes_read, ec}; } next_read_type_ = action::type::append_some; + + // Process the data that we've read while (read_buffer_->get_committed_size() != 0) { res_ = mpx_->consume_next(read_buffer_->get_committed_buffer(), ec); if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. - action_after_resume_ = {action::type::done, res_.second, ec}; - BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, res_.second, ec}; } if (res_.first == consume_result::needs_more) { @@ -65,9 +68,7 @@ reader_fsm::action reader_fsm::resume( if (res_.first == consume_result::got_push) { BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) if (ec) { - action_after_resume_ = {action::type::done, 0u, ec}; - BOOST_REDIS_YIELD(resume_point_, 7, action::type::cancel_run) - return action_after_resume_; + return {action::type::done, 0u, ec}; } } else { // TODO: Here we should notify the exec operation that From ac2b6cd9cb5624d879bac5ab6ff4d9eee5bfbd6f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 27 Sep 2025 19:00:09 +0200 Subject: [PATCH 03/43] Proper handling of cancellation in reader_fsm --- include/boost/redis/impl/reader_fsm.ipp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 90d662f3..1fce7beb 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -9,8 +9,17 @@ #include #include +#include +#include + namespace boost::redis::detail { +// TODO: this is duplicated +inline bool is_terminal_cancellation(asio::cancellation_type_t value) +{ + return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; +} + reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept : read_buffer_{&rbuf} , mpx_{&mpx} @@ -47,6 +56,11 @@ reader_fsm::action reader_fsm::resume( return {action::type::done, bytes_read, ec}; } + // Check for cancellations + if (is_terminal_cancellation(cancel_state)) { + return {action::type::done, 0u, asio::error::operation_aborted}; + } + next_read_type_ = action::type::append_some; // Process the data that we've read @@ -70,6 +84,9 @@ reader_fsm::action reader_fsm::resume( if (ec) { return {action::type::done, 0u, ec}; } + if (is_terminal_cancellation(cancel_state)) { + return {action::type::done, 0u, asio::error::operation_aborted}; + } } else { // TODO: Here we should notify the exec operation that // it can be completed. This will improve log clarity From b81b80a8ab2aa9ebaf453bc3c2ef50158b2ba9e7 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 27 Sep 2025 19:03:23 +0200 Subject: [PATCH 04/43] writer_op cancellations --- include/boost/redis/connection.hpp | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 419d4949..3595848d 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -228,31 +228,28 @@ struct writer_op { if (ec) { conn_->logger_.trace("writer_op (1)", ec); - conn_->cancel(operation::run); self.complete(ec); return; } conn_->mpx_.commit_write(); - // A socket.close() may have been called while a - // successful write might had already been queued, so we - // have to check here before proceeding. - if (!conn_->is_open()) { - conn_->logger_.trace("writer_op (2): connection is closed."); - self.complete({}); + // Check for cancellations + if (is_cancelled(self)) { + conn_->logger_.trace("writer_op (2): cancelled"); + self.complete(asio::error::operation_aborted); return; } } + // Wait for data to be available BOOST_ASIO_CORO_YIELD conn_->writer_timer_.async_wait(std::move(self)); - if (!conn_->is_open()) { - conn_->logger_.trace("writer_op (3): connection is closed."); - // Notice this is not an error of the op, stoping was - // requested from the outside, so we complete with - // success. - self.complete({}); + + // Check for cancellations + if (is_cancelled(self)) { + conn_->logger_.trace("writer_op (3): cancelled"); + self.complete(asio::error::operation_aborted); return; } } From 02166eb87e32f09126b97d16a21a1d17cb53cefa Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:23:02 +0200 Subject: [PATCH 05/43] Parallel group handling --- include/boost/redis/connection.hpp | 70 ++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 8e7f1550..ff7a6e7b 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -340,8 +340,11 @@ struct health_checker_op { { if (conn_->cfg_.health_check_interval == std::chrono::seconds::zero()) { conn_->logger_.trace("ping_op (1): timeout disabled."); - BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self)); - self.complete(system::error_code{}); + + // Wait until we're cancelled. This simplifies parallel group handling a lot + conn_->ping_timer_.expires_after((std::chrono::steady_clock::duration::max)()); + BOOST_ASIO_CORO_YIELD conn_->ping_timer_.async_wait(std::move(self)); + self.complete(asio::error::operation_aborted); return; } @@ -413,6 +416,40 @@ inline system::error_code check_config(const config& cfg) return system::error_code{}; } +inline system::error_code translate_parallel_group_errors( + std::array order, + system::error_code setup_ec, + system::error_code health_check_ec, + system::error_code reader_ec, + system::error_code writer_ec) +{ + // The setup request is special: it might complete successfully, + // without causing the other tasks to exit. + // The other tasks will always complete with an error. + if (order[0] == 0u) { + // The setup request finished first. If it failed with an error, + // this is the cause of the problem + if (setup_ec) + return setup_ec; + + // Otherwise, we need to look at which task finished next + if (order[1] == 1u) + return health_check_ec; + else if (order[2] == 1u) + return reader_ec; + else + return writer_ec; + } else { + // Look at the other tasks and see which one finished first + if (order[1] == 0u) + return health_check_ec; + else if (order[2] == 0u) + return reader_ec; + else + return writer_ec; + } +} + template class run_op { private: @@ -420,8 +457,6 @@ class run_op { asio::coroutine coro_{}; system::error_code stored_ec_; - using order_t = std::array; - static system::error_code on_setup_finished( connection_impl& conn, system::error_code ec) @@ -491,27 +526,15 @@ class run_op { template void operator()( Self& self, - order_t order, + std::array order, system::error_code setup_ec, system::error_code health_check_ec, system::error_code reader_ec, - system::error_code /* writer_ec */) + system::error_code writer_ec) { - // TODO: check - system::error_code final_ec; - - if (order[0] == 0 && !!setup_ec) { - // The setup op finished first and with an error - final_ec = setup_ec; - } else if (order[0] == 1 && health_check_ec == error::pong_timeout) { - // The check ping timeout finished first. Use the ping error code - final_ec = health_check_ec; - } else { - // Use the reader error code - final_ec = reader_ec; - } - - (*this)(self, final_ec); + (*this)( + self, + translate_parallel_group_errors(order, setup_ec, health_check_ec, reader_ec, writer_ec)); } template @@ -786,7 +809,10 @@ class basic_connection { // Overwrite the token's cancellation slot: the composed operation // should use the signal's slot so we can generate cancellations in cancel() - return asio::async_compose( + using token_t = decltype(asio::bind_cancellation_slot( + impl_->run_signal_.slot(), + std::forward(token))); + return asio::async_compose( detail::run_op{impl_.get()}, asio::bind_cancellation_slot( impl_->run_signal_.slot(), From 856e2ffc026a902ba27a742a775922824402674d Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:25:01 +0200 Subject: [PATCH 06/43] Fix async_compose --- include/boost/redis/connection.hpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index ff7a6e7b..1909055b 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -809,14 +809,12 @@ class basic_connection { // Overwrite the token's cancellation slot: the composed operation // should use the signal's slot so we can generate cancellations in cancel() - using token_t = decltype(asio::bind_cancellation_slot( + auto token_with_slot = asio::bind_cancellation_slot( impl_->run_signal_.slot(), - std::forward(token))); - return asio::async_compose( + std::forward(token)); + return asio::async_compose( detail::run_op{impl_.get()}, - asio::bind_cancellation_slot( - impl_->run_signal_.slot(), - std::forward(token)), + token_with_slot, impl_->writer_timer_); } From f80e46543eeae0552f5ae1b1da8be0f51c1eafe8 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:25:32 +0200 Subject: [PATCH 07/43] Remove action from logger --- include/boost/redis/impl/connection_logger.ipp | 1 - 1 file changed, 1 deletion(-) diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index c5b32677..fae04515 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -29,7 +29,6 @@ auto to_string(reader_fsm::action::type t) noexcept -> char const* BOOST_REDIS_READER_SWITCH_CASE(read_some); BOOST_REDIS_READER_SWITCH_CASE(needs_more); BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver); - BOOST_REDIS_READER_SWITCH_CASE(cancel_run); BOOST_REDIS_READER_SWITCH_CASE(done); default: return "action::type::"; } From 1d1be62a1cef659198bf98ca182a4e850e1aa52f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:31:06 +0200 Subject: [PATCH 08/43] Fix test_reader_fsm --- test/test_reader_fsm.cpp | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 27f4135f..46264446 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -166,11 +166,6 @@ void test_read_error() // Deliver the data act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - BOOST_TEST_EQ(act.ec_, error_code()); - - // Finish - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } @@ -196,11 +191,6 @@ void test_parse_error() // Deliver the data act = fsm.resume(payload.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - BOOST_TEST_EQ(act.ec_, error_code()); - - // Finish - act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); } @@ -231,10 +221,6 @@ void test_push_deliver_error() // Resumes from notifying a push with an error. act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - - // Finish - act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } @@ -263,10 +249,6 @@ void test_max_read_buffer_size() std::string const part1 = ">3\r\n"; copy_to(mpx, part1); act = fsm.resume(part1.size(), {}, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::cancel_run); - BOOST_TEST_EQ(act.ec_, error_code()); - - act = fsm.resume({}, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); } From 2392e30f0b2a093aaeacb80af3803d24d4328224 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:39:29 +0200 Subject: [PATCH 09/43] Adjust cancel --- include/boost/redis/connection.hpp | 52 ++++++++++----------- include/boost/redis/detail/redis_stream.hpp | 14 ------ 2 files changed, 24 insertions(+), 42 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 1909055b..15db5c85 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -144,29 +144,6 @@ struct connection_impl { writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); } - void cancel(operation op) - { - switch (op) { - case operation::resolve: stream_.cancel_resolve(); break; - case operation::exec: mpx_.cancel_waiting(); break; - case operation::reconnection: - cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); - break; - case operation::run: cancel_run(); break; - case operation::receive: receive_channel_.cancel(); break; - case operation::health_check: ping_timer_.cancel(); break; - case operation::all: - stream_.cancel_resolve(); - cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); - ping_timer_.cancel(); - cancel_run(); // run - receive_channel_.cancel(); // receive - mpx_.cancel_waiting(); // exec - break; - default: /* ignore */; - } - } - void cancel_run() { stream_.close(); @@ -300,6 +277,7 @@ struct health_checker_op { connection_impl* conn_; asio::coroutine coro_{}; + // TODO: properly check for timeouts vs. cancellations here system::error_code check_errors(system::error_code io_ec) { // Did we have a timeout? @@ -463,9 +441,6 @@ class run_op { { ec = check_setup_response(ec, conn.setup_resp_); conn.logger_.on_setup(ec, conn.setup_resp_); - if (ec) { - conn.cancel(operation::run); - } return ec; } @@ -604,7 +579,7 @@ class run_op { // The receive operation must be cancelled because channel // subscription does not survive a reconnection but requires // re-subscription. - conn_->cancel(operation::receive); + conn_->receive_channel_.cancel(); } // Check for cancellations @@ -1042,7 +1017,28 @@ class basic_connection { * * @param op The operation to be cancelled. */ - void cancel(operation op = operation::all) { impl_->cancel(op); } + void cancel(operation op = operation::all) + { + switch (op) { + case operation::exec: impl_->mpx_.cancel_waiting(); break; + case operation::receive: impl_->receive_channel_.cancel(); break; + case operation::reconnection: + impl_->cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); + break; + case operation::resolve: + case operation::run: + case operation::health_check: + impl_->run_signal_.emit(asio::cancellation_type_t::terminal); + break; + case operation::all: + impl_->mpx_.cancel_waiting(); // exec + impl_->receive_channel_.cancel(); // receive + impl_->cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect + impl_->run_signal_.emit(asio::cancellation_type_t::terminal); // rest + break; + default: /* ignore */; + } + } /// Returns true if the connection will try to reconnect if an error is encountered. bool will_reconnect() const noexcept { return impl_->will_reconnect(); } diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index e8590c70..e1c5070e 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -268,20 +268,6 @@ class redis_stream { default: BOOST_ASSERT(false); } } - - // Cleanup - void cancel_resolve() { resolv_.cancel(); } - - void close() - { - system::error_code ec; - if (stream_.next_layer().is_open()) - stream_.next_layer().close(ec); -#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - if (unix_socket_.is_open()) - unix_socket_.close(ec); -#endif - } }; } // namespace detail From 33058f97d7da8b99767994b4faf1723ff6c0557b Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:44:12 +0200 Subject: [PATCH 10/43] Fix problem with zero ping timeouts --- include/boost/redis/connection.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 15db5c85..bc8c901c 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -320,7 +320,7 @@ struct health_checker_op { conn_->logger_.trace("ping_op (1): timeout disabled."); // Wait until we're cancelled. This simplifies parallel group handling a lot - conn_->ping_timer_.expires_after((std::chrono::steady_clock::duration::max)()); + conn_->ping_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); BOOST_ASIO_CORO_YIELD conn_->ping_timer_.async_wait(std::move(self)); self.complete(asio::error::operation_aborted); return; From 690b2cd0b06aa27e46a4b9cb0077e2683eb9d967 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:50:54 +0200 Subject: [PATCH 11/43] Fix UNIX socket reconnection --- include/boost/redis/detail/redis_stream.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index e1c5070e..0095958c 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -99,6 +99,9 @@ class redis_stream { if (obj.transport_ == transport_type::unix_socket) { #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS + // Discard any existing state + obj.unix_socket_.close(ec); + // Directly connect to the socket BOOST_ASIO_CORO_YIELD obj.unix_socket_.async_connect( @@ -122,6 +125,7 @@ class redis_stream { // Must be done before anything else is done on the stream if (cfg->use_ssl && obj.ssl_stream_used_) obj.reset_stream(); + // TODO: do we need to do this for TCP too? do we need a test? BOOST_ASIO_CORO_YIELD obj.resolv_.async_resolve( From 860f53b41c1972a79b12109bd066356dab3a457b Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 19:52:03 +0200 Subject: [PATCH 12/43] Remove race in UNIX socket test --- test/test_unix_sockets.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/test_unix_sockets.cpp b/test/test_unix_sockets.cpp index abfcb278..75198157 100644 --- a/test/test_unix_sockets.cpp +++ b/test/test_unix_sockets.cpp @@ -81,6 +81,9 @@ void test_reconnection() cfg.reconnect_wait_interval = 10ms; // make the test run faster request ping_request; + ping_request.get_config().cancel_if_not_connected = false; + ping_request.get_config().cancel_if_unresponded = false; + ping_request.get_config().cancel_on_connection_lost = false; ping_request.push("PING", "some_value"); request quit_request; @@ -103,12 +106,6 @@ void test_reconnection() auto quit_callback = [&](error_code ec, std::size_t) { BOOST_TEST(ec == error_code()); - - // If a request is issued immediately after QUIT, the request sometimes - // fails, probably due to a race condition. This dispatches any pending - // handlers, triggering the reconnection process. - // TODO: this should not be required. - ioc.poll(); conn.async_exec(ping_request, ignore, ping_callback); }; From 87dba0315f0617be8123f4cd45ce58f18d9c4f8f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 20:39:24 +0200 Subject: [PATCH 13/43] Fixed bug handling parallel group result --- include/boost/redis/connection.hpp | 48 +++++++++++++----------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index bc8c901c..203d2ba7 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include @@ -404,27 +405,20 @@ inline system::error_code translate_parallel_group_errors( // The setup request is special: it might complete successfully, // without causing the other tasks to exit. // The other tasks will always complete with an error. - if (order[0] == 0u) { - // The setup request finished first. If it failed with an error, - // this is the cause of the problem - if (setup_ec) - return setup_ec; - - // Otherwise, we need to look at which task finished next - if (order[1] == 1u) - return health_check_ec; - else if (order[2] == 1u) - return reader_ec; - else - return writer_ec; - } else { - // Look at the other tasks and see which one finished first - if (order[1] == 0u) - return health_check_ec; - else if (order[2] == 0u) - return reader_ec; - else - return writer_ec; + + // If the setup task errored and was the first to exit, use its code + if (order[0] == 0u && setup_ec) { + return setup_ec; + } + + // Use the code corresponding to the task that finished first, + // excluding the setup task + std::size_t task_number = order[0] == 0u ? order[1] : order[0]; + switch (task_number) { + case 1u: return health_check_ec; + case 2u: return reader_ec; + case 3u: return writer_ec; + default: BOOST_ASSERT(false); return system::error_code(); } } @@ -582,18 +576,18 @@ class run_op { conn_->receive_channel_.cancel(); } - // Check for cancellations - if (is_cancelled(self)) { - self.complete(asio::error::operation_aborted); - return; - } - // If we are not going to try again, we're done if (!conn_->will_reconnect()) { self.complete(ec); return; } + // Check for cancellations + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + // Wait for the reconnection interval conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval); BOOST_ASIO_CORO_YIELD From c76267dca8ea478f1729dcf685070abeba359d89 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 1 Oct 2025 20:39:46 +0200 Subject: [PATCH 14/43] Improve push test --- test/test_conn_push.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 3708cfd2..1049c615 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -210,7 +210,6 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) conn->async_receive([&, conn](error_code ec, std::size_t) { BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled); - conn->cancel(operation::reconnection); push_received = true; }); @@ -220,7 +219,8 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) }); auto cfg = make_test_config(); - conn->async_run(cfg, {}, [&run_finished](error_code ec) { + cfg.reconnect_wait_interval = 0s; + conn->async_run(cfg, [&run_finished](error_code ec) { BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size); run_finished = true; }); From efe0b931fe3476a9a96792a23dd0539e83bbe737 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 17:45:34 +0200 Subject: [PATCH 15/43] Fix test_conn_exec_retry --- test/test_conn_exec_retry.cpp | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/test/test_conn_exec_retry.cpp b/test/test_conn_exec_retry.cpp index a01c63dd..e9d941c5 100644 --- a/test/test_conn_exec_retry.cpp +++ b/test/test_conn_exec_retry.cpp @@ -30,7 +30,7 @@ using namespace std::chrono_literals; namespace { -BOOST_AUTO_TEST_CASE(request_retry_false) +BOOST_AUTO_TEST_CASE(request_cancel_if_unresponded_true) { request req0; req0.get_config().cancel_on_connection_lost = true; @@ -105,8 +105,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false) BOOST_TEST(run_finished); } -BOOST_AUTO_TEST_CASE(request_retry_true) +BOOST_AUTO_TEST_CASE(request_cancel_if_unresponded_false) { + // The BLPOP request will block forever, causing the health checker + // to trigger a reconnection. Although req2 has been written, + // it has cancel_if_unresponded=false, so it will be retried + // after reconnection request req0; req0.get_config().cancel_on_connection_lost = true; req0.push("HELLO", 3); @@ -126,23 +130,10 @@ BOOST_AUTO_TEST_CASE(request_retry_true) req3.push("QUIT"); net::io_context ioc; - auto conn = std::make_shared(ioc); - - net::steady_timer st{ioc}; - - bool timer_finished = false, c0_called = false, c1_called = false, c2_called = false, - c3_called = false, run_finished = false; + auto conn = std::make_shared(ioc, logger::level::debug); - st.expires_after(std::chrono::seconds{1}); - st.async_wait([&](error_code ec) { - // Cancels the request before receiving the response. This - // should cause the third request to not complete with error - // since it has cancel_if_unresponded = true and cancellation - // comes after it was written. - timer_finished = true; - BOOST_TEST(ec == error_code()); - conn->cancel(operation::run); - }); + bool c0_called = false, c1_called = false, c2_called = false, c3_called = false, + run_finished = false; auto c3 = [&](error_code ec, std::size_t) { c3_called = true; @@ -172,8 +163,8 @@ BOOST_AUTO_TEST_CASE(request_retry_true) conn->async_exec(req0, ignore, c0); auto cfg = make_test_config(); - cfg.health_check_interval = 5s; - conn->async_run(cfg, {}, [&](error_code ec) { + cfg.health_check_interval = 200ms; + conn->async_run(cfg, [&](error_code ec) { run_finished = true; std::cout << ec.message() << std::endl; BOOST_TEST(ec != error_code()); @@ -181,7 +172,6 @@ BOOST_AUTO_TEST_CASE(request_retry_true) ioc.run_for(test_timeout); - BOOST_TEST(timer_finished); BOOST_TEST(c0_called); BOOST_TEST(c1_called); BOOST_TEST(c2_called); From d2f85be5230c9cc10ce1ba64d253e9975dbcb32a Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 17:47:20 +0200 Subject: [PATCH 16/43] Rename test --- test/test_conn_check_health.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index 8be5ecf2..aecedee6 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -25,7 +25,8 @@ using namespace std::chrono_literals; namespace { -void test_check_health() +// The health checker detects dead connections and triggers reconnection +void test_check_health_reconnection() { // Setup net::io_context ioc; @@ -79,7 +80,7 @@ void test_check_health() int main() { - test_check_health(); + test_check_health_reconnection(); return boost::report_errors(); } \ No newline at end of file From 0ac54ab0671878c36f435c01ea38844f6eb2bd1d Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 17:50:42 +0200 Subject: [PATCH 17/43] Failing error code test --- test/test_conn_check_health.cpp | 42 +++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index aecedee6..2f493183 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -26,7 +26,7 @@ using namespace std::chrono_literals; namespace { // The health checker detects dead connections and triggers reconnection -void test_check_health_reconnection() +void test_reconnection() { // Setup net::io_context ioc; @@ -76,11 +76,49 @@ void test_check_health_reconnection() BOOST_TEST(exec2_finished); } +// We use the correct error code when a ping times out +void test_error_code() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + + // This request will block forever, causing the connection to become unresponsive + request req; + req.push("BLPOP", "any", 0); + + // Make the test run faster + auto cfg = make_test_config(); + cfg.health_check_interval = 200ms; + cfg.reconnect_wait_interval = 0s; + + bool run_finished = false, exec_finished = false; + + conn.async_run(cfg, [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, boost::redis::error::pong_timeout); + }); + + // This request will complete after the health checker deems the connection + // as unresponsive and triggers a reconnection (it's configured to be cancelled + // on connection lost). + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(run_finished); + BOOST_TEST(exec_finished); +} + } // namespace int main() { - test_check_health_reconnection(); + test_reconnection(); + test_error_code(); return boost::report_errors(); } \ No newline at end of file From 43bb37ee5ff8fc900ec9b7ed652982d9f082f379 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 18:03:30 +0200 Subject: [PATCH 18/43] Proper error code translation in PING --- include/boost/redis/connection.hpp | 31 ++++++++++++------------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 203d2ba7..d9d144dd 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -278,13 +278,18 @@ struct health_checker_op { connection_impl* conn_; asio::coroutine coro_{}; - // TODO: properly check for timeouts vs. cancellations here - system::error_code check_errors(system::error_code io_ec) + system::error_code check_errors(system::error_code io_ec, asio::cancellation_type_t cancel_state) { - // Did we have a timeout? + // Did we have a cancellation? We might not have an error code here + if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) { + conn_->logger_.log(logger::level::info, "Health checker: cancelled"); + return asio::error::operation_aborted; + } + + // operation_aborted and no cancel state means that asio::cancel_after timed out if (io_ec == asio::error::operation_aborted) { conn_->logger_.log(logger::level::info, "Health checker: ping timed out"); - return asio::error::operation_aborted; + return error::pong_timeout; } // Did we have other unknown error? @@ -342,15 +347,8 @@ struct health_checker_op { asio::cancel_after(conn->ping_timer_, timeout, std::move(self))); } - // Check for cancellations - if (is_cancelled(self)) { - conn_->logger_.trace("ping_op (2): cancelled"); - self.complete(asio::error::operation_aborted); - return; - } - - // Check for errors in PING - ec = check_errors(ec); + // Check for cancellations and errors in PING + ec = check_errors(ec, self.get_cancellation_state().cancelled()); if (ec) { self.complete(ec); return; @@ -361,14 +359,9 @@ struct health_checker_op { BOOST_ASIO_CORO_YIELD conn_->ping_timer_.async_wait(std::move(self)); - if (ec) { - conn_->logger_.trace("ping_op (3)", ec); - self.complete(ec); - return; - } if (is_cancelled(self)) { - conn_->logger_.trace("ping_op (4): cancelled"); + conn_->logger_.trace("ping_op (2): cancelled"); self.complete(asio::error::operation_aborted); return; } From 7e79751ba0d806cdadf662a184b0bbcc533f0587 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 18:08:51 +0200 Subject: [PATCH 19/43] PING disabled test --- test/test_conn_check_health.cpp | 42 +++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index 2f493183..7314c53a 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -113,12 +113,54 @@ void test_error_code() BOOST_TEST(exec_finished); } +// A ping interval of zero disables timeouts (and doesn't cause trouble) +void test_disabled() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + + // Run a couple of requests to verify that the connection works fine + request req1; + req1.push("PING", "health_check_disabled_1"); + + request req2; + req1.push("PING", "health_check_disabled_2"); + + auto cfg = make_test_config(); + cfg.health_check_interval = 0s; + + bool run_finished = false, exec1_finished = false, exec2_finished = false; + + conn.async_run(cfg, [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) { + exec1_finished = true; + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) { + exec2_finished = true; + BOOST_TEST_EQ(ec2, error_code()); + conn.cancel(); + }); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(run_finished); + BOOST_TEST(exec1_finished); + BOOST_TEST(exec2_finished); +} + } // namespace int main() { test_reconnection(); test_error_code(); + test_disabled(); return boost::report_errors(); } \ No newline at end of file From 74a2cece98c4fdb6421280c42fe38dde35af8c25 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 18:11:02 +0200 Subject: [PATCH 20/43] Remove TODO in redis_stream --- include/boost/redis/detail/redis_stream.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index 0095958c..6f2a272f 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -122,10 +122,11 @@ class redis_stream { } else { // ssl::stream doesn't support being re-used. If we're to use // TLS and the stream has been used, re-create it. - // Must be done before anything else is done on the stream + // Must be done before anything else is done on the stream. + // Note that we don't need to close the socket here because + // range connect does it for us. if (cfg->use_ssl && obj.ssl_stream_used_) obj.reset_stream(); - // TODO: do we need to do this for TCP too? do we need a test? BOOST_ASIO_CORO_YIELD obj.resolv_.async_resolve( From 96e66919af60b57c70dc88d6a3039c0637ea2133 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 18:13:11 +0200 Subject: [PATCH 21/43] Cleanup TLS reconnection test --- test/test_conn_tls.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index ef49e333..6832a5e6 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -152,6 +152,8 @@ BOOST_AUTO_TEST_CASE(reconnection) request ping_request; ping_request.push("PING", "some_value"); + ping_request.get_config().cancel_if_unresponded = false; + ping_request.get_config().cancel_on_connection_lost = false; request quit_request; quit_request.push("QUIT"); @@ -173,12 +175,6 @@ BOOST_AUTO_TEST_CASE(reconnection) auto quit_callback = [&](error_code ec, std::size_t) { BOOST_TEST(ec == error_code()); - - // If a request is issued immediately after QUIT, the request sometimes - // fails, probably due to a race condition. This dispatches any pending - // handlers, triggering the reconnection process. - // TODO: this should not be required. - ioc.poll(); conn.async_exec(ping_request, ignore, ping_callback); }; From 7c1f98bc2a284c25b7f14049ee94f9213ca4de86 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 18:54:42 +0200 Subject: [PATCH 22/43] Proper per-operation/cancel function bridging --- include/boost/redis/connection.hpp | 40 ++++++++++++++------- include/boost/redis/detail/redis_stream.hpp | 6 ++++ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index d9d144dd..7504eb89 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -147,8 +147,16 @@ struct connection_impl { void cancel_run() { - stream_.close(); - writer_timer_.cancel(); + // Individual operations should see a terminal cancellation, regardless + // of what we got requested. We take enough actions to ensure that this + // doesn't prevent the object from being re-used (e.g. we reset the TLS stream). + run_signal_.emit(asio::cancellation_type_t::terminal); + + // Name resolution doesn't support per-operation cancellation + stream_.cancel_resolve(); + + // Receive is technically not part of run, but we also cancel it for + // simplicity and backwards compatibility. receive_channel_.cancel(); } @@ -604,15 +612,25 @@ class run_op { logger make_stderr_logger(logger::level lvl, std::string prefix); +template class run_cancel_handler { - asio::cancellation_signal* sig_; + connection_impl* conn_; public: - explicit run_cancel_handler(asio::cancellation_signal& sig) noexcept - : sig_(&sig) + explicit run_cancel_handler(connection_impl& conn) noexcept + : conn_(&conn) { } - void operator()(asio::cancellation_type_t cancel_type) const { sig_->emit(cancel_type); } + void operator()(asio::cancellation_type_t cancel_type) const + { + // We support terminal and partial cancellation + constexpr auto mask = asio::cancellation_type_t::terminal | + asio::cancellation_type_t::partial; + + if ((cancel_type & mask) != asio::cancellation_type_t::none) { + conn_->cancel_run(); + } + } }; } // namespace detail @@ -766,7 +784,7 @@ class basic_connection { // function and per-operation cancellation auto slot = asio::get_associated_cancellation_slot(token); if (slot.is_connected()) { - slot.template emplace(impl_->run_signal_); + slot.template emplace>(*impl_); } // Overwrite the token's cancellation slot: the composed operation @@ -1009,19 +1027,17 @@ class basic_connection { switch (op) { case operation::exec: impl_->mpx_.cancel_waiting(); break; case operation::receive: impl_->receive_channel_.cancel(); break; + case operation::resolve: impl_->stream_.cancel_resolve(); break; case operation::reconnection: impl_->cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); break; - case operation::resolve: case operation::run: - case operation::health_check: - impl_->run_signal_.emit(asio::cancellation_type_t::terminal); - break; + case operation::health_check: impl_->cancel_run(); break; case operation::all: impl_->mpx_.cancel_waiting(); // exec impl_->receive_channel_.cancel(); // receive impl_->cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect - impl_->run_signal_.emit(asio::cancellation_type_t::terminal); // rest + impl_->cancel_run(); // run break; default: /* ignore */; } diff --git a/include/boost/redis/detail/redis_stream.hpp b/include/boost/redis/detail/redis_stream.hpp index 6f2a272f..670fe114 100644 --- a/include/boost/redis/detail/redis_stream.hpp +++ b/include/boost/redis/detail/redis_stream.hpp @@ -273,6 +273,12 @@ class redis_stream { default: BOOST_ASSERT(false); } } + + // Cancels resolve operations. Resolve operations don't support per-operation + // cancellation, but resolvers have a cancel() function. Resolve operations are + // in general blocking and run in a separate thread. cancel() has effect only + // if the operation hasn't started yet. Still, trying is better than nothing + void cancel_resolve() { resolv_.cancel(); } }; } // namespace detail From c6ba6b3d040c2e2352a3ce3c4c85b4dbab7efd16 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 19:00:42 +0200 Subject: [PATCH 23/43] Test terminal run cancellation --- test/CMakeLists.txt | 1 + test/test_conn_run_cancel.cpp | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 test/test_conn_run_cancel.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9ef1791d..e5d4e24c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,6 +49,7 @@ make_test(test_conn_quit) make_test(test_conn_exec_retry) make_test(test_conn_exec_error) make_test(test_run) +make_test(test_conn_run_cancel) make_test(test_conn_check_health) make_test(test_conn_exec) make_test(test_conn_push) diff --git a/test/test_conn_run_cancel.cpp b/test/test_conn_run_cancel.cpp new file mode 100644 index 00000000..a59fed74 --- /dev/null +++ b/test/test_conn_run_cancel.cpp @@ -0,0 +1,68 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include +#include + +#include "common.hpp" + +#include + +using boost::system::error_code; +namespace net = boost::asio; +using namespace boost::redis; + +namespace { + +// Per-operation cancellation works for async_run +void test_terminal_cancellation() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + net::cancellation_signal sig; + + request req; + req.push("PING", "something"); + + bool run_finished = false, exec_finished = false; + + // Run the connection + auto run_cb = [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }; + conn.async_run(make_test_config(), net::bind_cancellation_slot(sig.slot(), run_cb)); + + // Launch a PING + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST_EQ(ec, error_code()); + sig.emit(net::cancellation_type_t::terminal); + }); + + ioc.run_for(test_timeout); + + // Check + BOOST_TEST(run_finished); + BOOST_TEST(exec_finished); +} + +} // namespace + +int main() +{ + test_terminal_cancellation(); + + return boost::report_errors(); +} From c29525b4221feb902c9a0f4901da2e87b0fd1f75 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 19:03:27 +0200 Subject: [PATCH 24/43] Test partial cancellation --- test/test_conn_run_cancel.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/test/test_conn_run_cancel.cpp b/test/test_conn_run_cancel.cpp index a59fed74..769709c4 100644 --- a/test/test_conn_run_cancel.cpp +++ b/test/test_conn_run_cancel.cpp @@ -17,6 +17,8 @@ #include "common.hpp" #include +#include +#include using boost::system::error_code; namespace net = boost::asio; @@ -24,9 +26,11 @@ using namespace boost::redis; namespace { -// Per-operation cancellation works for async_run -void test_terminal_cancellation() +// Terminal and partial cancellation work for async_run +void test_per_operation_cancellation(std::string_view name, net::cancellation_type_t cancel_type) { + std::cerr << "Running test case: " << name << std::endl; + // Setup net::io_context ioc; connection conn{ioc}; @@ -48,7 +52,7 @@ void test_terminal_cancellation() conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { exec_finished = true; BOOST_TEST_EQ(ec, error_code()); - sig.emit(net::cancellation_type_t::terminal); + sig.emit(cancel_type); }); ioc.run_for(test_timeout); @@ -62,7 +66,8 @@ void test_terminal_cancellation() int main() { - test_terminal_cancellation(); + test_per_operation_cancellation("terminal", net::cancellation_type_t::terminal); + test_per_operation_cancellation("partial", net::cancellation_type_t::partial); return boost::report_errors(); } From eefc9622b64cdccfa187f57218355ec9e5818261 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 19:11:56 +0200 Subject: [PATCH 25/43] Docs --- include/boost/redis/connection.hpp | 61 ++++++++++++++++++------------ 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 7504eb89..c7c47f77 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -160,6 +160,27 @@ struct connection_impl { receive_channel_.cancel(); } + void cancel(operation op) + { + switch (op) { + case operation::exec: mpx_.cancel_waiting(); break; + case operation::receive: receive_channel_.cancel(); break; + case operation::resolve: stream_.cancel_resolve(); break; + case operation::reconnection: + cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); + break; + case operation::run: + case operation::health_check: cancel_run(); break; + case operation::all: + mpx_.cancel_waiting(); // exec + receive_channel_.cancel(); // receive + cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect + cancel_run(); // run + break; + default: /* ignore */; + } + } + bool is_open() const noexcept { return stream_.is_open(); } bool will_reconnect() const noexcept @@ -628,7 +649,7 @@ class run_cancel_handler { asio::cancellation_type_t::partial; if ((cancel_type & mask) != asio::cancellation_type_t::none) { - conn_->cancel_run(); + conn_->cancel(operation::all); } } }; @@ -767,11 +788,20 @@ class basic_connection { * void f(system::error_code); * @endcode * - * For example on how to call this function refer to - * cpp20_intro.cpp or any other example. + * @par Per-operation cancellation + * This operation supports the following cancellation types: * - * @param cfg Configuration parameters. - * @param token Completion token. + * @li `asio::cancellation_type_t::terminal`. + * @li `asio::cancellation_type_t::partial`. + * + * In both cases, cancellation is equivalent to calling @ref basic_connection::cancel, + * without arguments. + * + * For example on how to call this function refer to + * cpp20_intro.cpp or any other example. + * + * @param cfg Configuration parameters. + * @param token Completion token. */ template > auto async_run(config const& cfg, CompletionToken&& token = {}) @@ -1022,26 +1052,7 @@ class basic_connection { * * @param op The operation to be cancelled. */ - void cancel(operation op = operation::all) - { - switch (op) { - case operation::exec: impl_->mpx_.cancel_waiting(); break; - case operation::receive: impl_->receive_channel_.cancel(); break; - case operation::resolve: impl_->stream_.cancel_resolve(); break; - case operation::reconnection: - impl_->cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); - break; - case operation::run: - case operation::health_check: impl_->cancel_run(); break; - case operation::all: - impl_->mpx_.cancel_waiting(); // exec - impl_->receive_channel_.cancel(); // receive - impl_->cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect - impl_->cancel_run(); // run - break; - default: /* ignore */; - } - } + void cancel(operation op = operation::all) { impl_->cancel(op); } /// Returns true if the connection will try to reconnect if an error is encountered. bool will_reconnect() const noexcept { return impl_->will_reconnect(); } From 3928762252ff3439f89d4960686450f1694dbcdf Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 2 Oct 2025 19:21:42 +0200 Subject: [PATCH 26/43] async_receive cancellation docs --- include/boost/redis/connection.hpp | 40 +++++++++++++++++------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c7c47f77..3e700e06 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -778,9 +778,8 @@ class basic_connection { * before `async_run` is called will be written to the server immediately. * * When a connection is lost for any reason, a new one is - * established automatically. To disable reconnection call - * `boost::redis::connection::cancel(operation::reconnection)` - * or set @ref boost::redis::config::reconnect_wait_interval to zero. + * established automatically. To disable reconnection + * set @ref boost::redis::config::reconnect_wait_interval to zero. * * The completion token must have the following signature * @@ -881,25 +880,32 @@ class basic_connection { /** @brief Receives server side pushes asynchronously. * - * When pushes arrive and there is no `async_receive` operation in - * progress, pushed data, requests, and responses will be paused - * until `async_receive` is called again. Apps will usually want - * to call `async_receive` in a loop. + * When pushes arrive and there is no `async_receive` operation in + * progress, pushed data, requests, and responses will be paused + * until `async_receive` is called again. Apps will usually want + * to call `async_receive` in a loop. * - * To cancel an ongoing receive operation apps should call - * `basic_connection::cancel(operation::receive)`. + * For an example see cpp20_subscriber.cpp. The completion token must + * have the following signature * - * For an example see cpp20_subscriber.cpp. The completion token must - * have the following signature + * @code + * void f(system::error_code, std::size_t); + * @endcode * - * @code - * void f(system::error_code, std::size_t); - * @endcode + * Where the second parameter is the size of the push received in + * bytes. + * + * @par Per-operation cancellation + * This operation supports the following cancellation types: * - * Where the second parameter is the size of the push received in - * bytes. + * @li `asio::cancellation_type_t::terminal`. + * @li `asio::cancellation_type_t::partial`. + * @li `asio::cancellation_type_t::total`. + * + * Calling @ref basic_connection::cancel (without arguments) will + * also cancel any ongoing receive operations. * - * @param token Completion token. + * @param token Completion token. */ template > auto async_receive(CompletionToken&& token = {}) From 52a12bedb3da87c93fa888ec1206f9a5f3de00e6 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 10:57:39 +0200 Subject: [PATCH 27/43] operation deprecations --- include/boost/redis/operation.hpp | 56 ++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index 624d34b4..da087cc0 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -16,22 +16,68 @@ namespace boost::redis { */ enum class operation { - /// Resolve operation. + /** + * @brief (Deprecated) Resolve operation. + * + * Cancelling a single resolve operation is probably not what you + * want, since there is no way to detect when a connection is performing name resolution. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes name resolution. + */ resolve, - /// Connect operation. + + /** + * @brief (Deprecated) Connect operation. + * + * Cancelling a single connect operation is probably not what you + * want, since there is no way to detect when a connection is performing a connect operation. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes connection establishment. + */ connect, - /// SSL handshake operation. + + /** + * @brief (Deprecated) SSL handshake operation. + * + * Cancelling a single connect operation is probably not what you + * want, since there is no way to detect when a connection is performing an SSL handshake. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes the SSL handshake. + */ ssl_handshake, + /// Refers to `connection::async_exec` operations. exec, + /// Refers to `connection::async_run` operations. run, + /// Refers to `connection::async_receive` operations. receive, - /// Cancels reconnection. + + /** + * @brief (Deprecated) Cancels reconnection. + * + * Cancelling reconnection doesn't really cancel anything. + * It will only prevent further connections attempts from being + * made once the current connection encounters an error. + * + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes reconnection. If you want to disable reconnection completely, + * set @ref config::reconnect_wait_interval to zero before calling `async_run`. + */ reconnection, - /// Health check operation. + + /** + * @brief (Deprecated) Health check operation. + * + * Cancelling the health checker only is probably not what you want. + * Use @ref operation::run to cancel the current @ref basic_connection::async_run operation, + * which includes the health checker. If you want to disable health checks completely, + * set @ref config::health_check_interval to zero before calling `async_run`. + */ health_check, + /// Refers to all operations. all, }; From 2a50e2efe9f08d29ae78cb1117fb445c37b209b5 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:00:32 +0200 Subject: [PATCH 28/43] Adjust to docs --- include/boost/redis/connection.hpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 3e700e06..e0001b0d 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -156,7 +156,7 @@ struct connection_impl { stream_.cancel_resolve(); // Receive is technically not part of run, but we also cancel it for - // simplicity and backwards compatibility. + // backwards compatibility. receive_channel_.cancel(); } @@ -165,12 +165,14 @@ struct connection_impl { switch (op) { case operation::exec: mpx_.cancel_waiting(); break; case operation::receive: receive_channel_.cancel(); break; - case operation::resolve: stream_.cancel_resolve(); break; case operation::reconnection: cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); break; case operation::run: - case operation::health_check: cancel_run(); break; + case operation::resolve: + case operation::connect: + case operation::ssl_handshake: + case operation::health_check: cancel_run(); break; case operation::all: mpx_.cancel_waiting(); // exec receive_channel_.cancel(); // receive @@ -649,7 +651,7 @@ class run_cancel_handler { asio::cancellation_type_t::partial; if ((cancel_type & mask) != asio::cancellation_type_t::none) { - conn_->cancel(operation::all); + conn_->cancel(operation::run); } } }; From 7fa84386d442f24a3f18aae59a63a0237f6c1506 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:04:08 +0200 Subject: [PATCH 29/43] Minor doc corrections --- include/boost/redis/connection.hpp | 34 +++++++++++++++--------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index e0001b0d..8db2213a 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -145,21 +145,6 @@ struct connection_impl { writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); } - void cancel_run() - { - // Individual operations should see a terminal cancellation, regardless - // of what we got requested. We take enough actions to ensure that this - // doesn't prevent the object from being re-used (e.g. we reset the TLS stream). - run_signal_.emit(asio::cancellation_type_t::terminal); - - // Name resolution doesn't support per-operation cancellation - stream_.cancel_resolve(); - - // Receive is technically not part of run, but we also cancel it for - // backwards compatibility. - receive_channel_.cancel(); - } - void cancel(operation op) { switch (op) { @@ -183,6 +168,21 @@ struct connection_impl { } } + void cancel_run() + { + // Individual operations should see a terminal cancellation, regardless + // of what we got requested. We take enough actions to ensure that this + // doesn't prevent the object from being re-used (e.g. we reset the TLS stream). + run_signal_.emit(asio::cancellation_type_t::terminal); + + // Name resolution doesn't support per-operation cancellation + stream_.cancel_resolve(); + + // Receive is technically not part of run, but we also cancel it for + // backwards compatibility. + receive_channel_.cancel(); + } + bool is_open() const noexcept { return stream_.is_open(); } bool will_reconnect() const noexcept @@ -896,7 +896,7 @@ class basic_connection { * * Where the second parameter is the size of the push received in * bytes. - * + * * @par Per-operation cancellation * This operation supports the following cancellation types: * @@ -904,7 +904,7 @@ class basic_connection { * @li `asio::cancellation_type_t::partial`. * @li `asio::cancellation_type_t::total`. * - * Calling @ref basic_connection::cancel (without arguments) will + * Calling `basic_connection::cancel(operation::receive)` will * also cancel any ongoing receive operations. * * @param token Completion token. From 286ddcb26172413c7d756daef19aa61af3361568 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:06:00 +0200 Subject: [PATCH 30/43] Move FSM test fn --- test/test_reader_fsm.cpp | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 46264446..47291937 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -12,8 +12,6 @@ #include #include -#include "common.hpp" - namespace net = boost::asio; namespace redis = boost::redis; using boost::system::error_code; @@ -25,6 +23,7 @@ using redis::any_adapter; using redis::config; using action = redis::detail::reader_fsm::action; +// Operators namespace boost::redis::detail { extern auto to_string(reader_fsm::action::type t) noexcept -> char const*; @@ -35,6 +34,10 @@ std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t) return os; } +} // namespace boost::redis::detail + +namespace { + // Copy data into the multiplexer with the following steps // // 1. get_read_buffer @@ -48,11 +51,6 @@ void copy_to(multiplexer& mpx, std::string_view data) std::copy(data.cbegin(), data.cend(), buffer.begin()); } -} // namespace boost::redis::detail - -// Operators -namespace { - void test_push() { multiplexer mpx; From 7aaacf08103bd87745ed7aadfd96a362a1836391 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:07:47 +0200 Subject: [PATCH 31/43] Remove unused macros --- include/boost/redis/detail/helper.hpp | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/include/boost/redis/detail/helper.hpp b/include/boost/redis/detail/helper.hpp index d6c2481a..58a7fe97 100644 --- a/include/boost/redis/detail/helper.hpp +++ b/include/boost/redis/detail/helper.hpp @@ -17,18 +17,6 @@ auto is_cancelled(T const& self) return self.get_cancellation_state().cancelled() != asio::cancellation_type_t::none; } -#define BOOST_REDIS_CHECK_OP0(X) \ - if (ec || redis::detail::is_cancelled(self)) { \ - X self.complete(!!ec ? ec : asio::error::operation_aborted); \ - return; \ - } - -#define BOOST_REDIS_CHECK_OP1(X) \ - if (ec || redis::detail::is_cancelled(self)) { \ - X self.complete(!!ec ? ec : asio::error::operation_aborted, {}); \ - return; \ - } - } // namespace boost::redis::detail #endif // BOOST_REDIS_HELPER_HPP From 4656f8dee685a12ef4afa1da546df940712e47aa Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:08:48 +0200 Subject: [PATCH 32/43] Remove TODO comments --- include/boost/redis/impl/reader_fsm.ipp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 035ea73c..05a97548 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -13,8 +13,7 @@ namespace boost::redis::detail { -// TODO: this is duplicated -inline bool is_terminal_cancellation(asio::cancellation_type_t value) +inline bool is_terminal_cancel(asio::cancellation_type_t value) { return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; } @@ -23,7 +22,6 @@ reader_fsm::reader_fsm(multiplexer& mpx) noexcept : mpx_{&mpx} { } -// TODO: write cancellation tests reader_fsm::action reader_fsm::resume( std::size_t bytes_read, system::error_code ec, @@ -55,7 +53,7 @@ reader_fsm::action reader_fsm::resume( } // Check for cancellations - if (is_terminal_cancellation(cancel_state)) { + if (is_terminal_cancel(cancel_state)) { return {action::type::done, 0u, asio::error::operation_aborted}; } @@ -80,7 +78,7 @@ reader_fsm::action reader_fsm::resume( if (ec) { return {action::type::done, 0u, ec}; } - if (is_terminal_cancellation(cancel_state)) { + if (is_terminal_cancel(cancel_state)) { return {action::type::done, 0u, asio::error::operation_aborted}; } } else { From b603f0ec4d9ec8eaa8efd52017839a1663d1a819 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:10:52 +0200 Subject: [PATCH 33/43] Remove setup cancellation action --- include/boost/redis/connection.hpp | 3 --- include/boost/redis/detail/reader_fsm.hpp | 3 +-- include/boost/redis/impl/connection_logger.ipp | 1 - include/boost/redis/impl/reader_fsm.ipp | 1 - test/test_reader_fsm.cpp | 14 ++------------ 5 files changed, 3 insertions(+), 19 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 8db2213a..32b85c85 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -280,9 +280,6 @@ struct reader_op { conn_->logger_.on_fsm_resume(act); switch (act.type_) { - case reader_fsm::action::type::setup_cancellation: - self.reset_cancellation_state(asio::enable_terminal_cancellation()); - continue; case reader_fsm::action::type::needs_more: case reader_fsm::action::type::read_some: { diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 8a90eb46..7d2c6b0c 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -22,14 +22,13 @@ class reader_fsm { struct action { enum class type { - setup_cancellation, read_some, needs_more, notify_push_receiver, done, }; - type type_ = type::setup_cancellation; + type type_ = type::done; std::size_t push_size_ = 0u; system::error_code ec_ = {}; }; diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index fae04515..bdb72b6b 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -25,7 +25,6 @@ namespace boost::redis::detail { auto to_string(reader_fsm::action::type t) noexcept -> char const* { switch (t) { - BOOST_REDIS_READER_SWITCH_CASE(setup_cancellation); BOOST_REDIS_READER_SWITCH_CASE(read_some); BOOST_REDIS_READER_SWITCH_CASE(needs_more); BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver); diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 05a97548..0d8a0cef 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -29,7 +29,6 @@ reader_fsm::action reader_fsm::resume( { switch (resume_point_) { BOOST_REDIS_CORO_INITIAL - BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation) for (;;) { // Prepare the buffer for the read operation diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 47291937..8d356475 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -62,8 +62,6 @@ void test_push() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -109,8 +107,6 @@ void test_read_needs_more() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Split the incoming message in three random parts and deliver @@ -154,8 +150,6 @@ void test_read_error() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -179,8 +173,6 @@ void test_parse_error() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -204,8 +196,6 @@ void test_push_deliver_error() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // The fsm is asking for data. @@ -239,8 +229,6 @@ void test_max_read_buffer_size() // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); - BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); - act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::read_some); // Passes the first part to the fsm. @@ -251,6 +239,8 @@ void test_max_read_buffer_size() BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); } +// TODO: cancellations + } // namespace int main() From cc95af0d56f3061f7c920d94bc2b122378b9c167 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:16:06 +0200 Subject: [PATCH 34/43] Reorder --- test/test_reader_fsm.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 8d356475..70b00eda 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -245,12 +245,13 @@ void test_max_read_buffer_size() int main() { - test_max_read_buffer_size(); - test_push_deliver_error(); - test_read_needs_more(); test_push(); + test_read_needs_more(); + test_read_error(); test_parse_error(); + test_push_deliver_error(); + test_max_read_buffer_size(); return boost::report_errors(); } From d8818dcf8b995a522b2cf5bd80cbff96066baf60 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:20:47 +0200 Subject: [PATCH 35/43] Cancel after read --- test/test_reader_fsm.cpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 70b00eda..e4019db0 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -12,6 +12,8 @@ #include #include +#include + namespace net = boost::asio; namespace redis = boost::redis; using boost::system::error_code; @@ -240,6 +242,28 @@ void test_max_read_buffer_size() } // TODO: cancellations +void test_cancel_after_read() +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + reader_fsm fsm{mpx}; + error_code ec; + action act; + + // Initiate + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::read_some); + + // Deliver a push, and notify a cancellation. + // This can happen if the cancellation signal arrives before the read handler runs + constexpr std::string_view payload = ">1\r\n+msg1\r\n"; + copy_to(mpx, payload); + act = fsm.resume(payload.size(), ec, cancellation_type_t::terminal); + BOOST_TEST_EQ(act.type_, action::type::done); + BOOST_TEST_EQ(act.push_size_, 0u); + BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); +} } // namespace @@ -253,5 +277,7 @@ int main() test_push_deliver_error(); test_max_read_buffer_size(); + test_cancel_after_read(); + return boost::report_errors(); } From cb23d2b5dd35ac0212420fdb3eb3d77b5e9ca410 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:23:09 +0200 Subject: [PATCH 36/43] Push delivery cancel test --- test/test_reader_fsm.cpp | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index e4019db0..af41a0a9 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -241,7 +241,7 @@ void test_max_read_buffer_size() BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); } -// TODO: cancellations +// Cancellations void test_cancel_after_read() { multiplexer mpx; @@ -265,6 +265,40 @@ void test_cancel_after_read() BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); } +void test_cancel_after_push_delivery() +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + reader_fsm fsm{mpx}; + error_code ec; + action act; + + // Initiate + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::read_some); + + // The fsm is asking for data. + constexpr std::string_view payload = + ">1\r\n+msg1\r\n" + ">1\r\n+msg2 \r\n"; + + copy_to(mpx, payload); + + // Deliver the 1st push + act = fsm.resume(payload.size(), ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); + BOOST_TEST_EQ(act.push_size_, 11u); + BOOST_TEST_EQ(act.ec_, error_code()); + + // We got a cancellation after delivering it. + // This can happen if the cancellation signal arrives before the channel send handler runs + act = fsm.resume(0, ec, cancellation_type_t::terminal); + BOOST_TEST_EQ(act.type_, action::type::done); + BOOST_TEST_EQ(act.push_size_, 0u); + BOOST_TEST_EQ(act.ec_, net::error::operation_aborted); +} + } // namespace int main() @@ -278,6 +312,7 @@ int main() test_max_read_buffer_size(); test_cancel_after_read(); + test_cancel_after_push_delivery(); return boost::report_errors(); } From 70f4b42cf046fb0d5916fac1f7bc485ff2735172 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:24:00 +0200 Subject: [PATCH 37/43] adjust yield numbers --- include/boost/redis/impl/reader_fsm.ipp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 0d8a0cef..aca75069 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -38,7 +38,7 @@ reader_fsm::action reader_fsm::resume( } // Read - BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_) + BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_) // Process the bytes read, even if there was an error mpx_->commit_read(bytes_read); @@ -73,7 +73,7 @@ reader_fsm::action reader_fsm::resume( } if (res_.first == consume_result::got_push) { - BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) + BOOST_REDIS_YIELD(resume_point_, 2, action::type::notify_push_receiver, res_.second) if (ec) { return {action::type::done, 0u, ec}; } From b8cdb555b396b4b90437bf6b25468af2aabe5eb7 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:24:52 +0200 Subject: [PATCH 38/43] Remove unused parameter comment --- include/boost/redis/detail/reader_fsm.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 7d2c6b0c..ed6b6ece 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -38,7 +38,7 @@ class reader_fsm { action resume( std::size_t bytes_read, system::error_code ec, - asio::cancellation_type_t /*cancel_state*/); + asio::cancellation_type_t cancel_state); private: int resume_point_{0}; From 974c00fe9051843f3fde9479eef48adba55d3805 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 11:37:50 +0200 Subject: [PATCH 39/43] Fix incorrect piece of docs --- include/boost/redis/connection.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 32b85c85..c93cbdab 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -792,8 +792,8 @@ class basic_connection { * @li `asio::cancellation_type_t::terminal`. * @li `asio::cancellation_type_t::partial`. * - * In both cases, cancellation is equivalent to calling @ref basic_connection::cancel, - * without arguments. + * In both cases, cancellation is equivalent to calling @ref basic_connection::cancel + * passing @ref operation::run as argument. * * For example on how to call this function refer to * cpp20_intro.cpp or any other example. From 39fab320f153a6fe15efa72b942cbb0780ae219f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 12:12:49 +0200 Subject: [PATCH 40/43] Typo --- include/boost/redis/connection.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c93cbdab..b77db1bc 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -912,7 +912,7 @@ class basic_connection { return impl_->receive_channel_.async_receive(std::forward(token)); } - /** @brief Receives server> pushes synchronously without blocking. + /** @brief Receives server pushes synchronously without blocking. * * Receives a server push synchronously by calling `try_receive` on * the underlying channel. If the operation fails because From 066926ed4d6a3ab133435ad097d59106ea7fc177 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 13:47:01 +0200 Subject: [PATCH 41/43] Move fn to ipp --- include/boost/redis/connection.hpp | 24 ++-------------------- include/boost/redis/impl/connection.ipp | 27 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index b77db1bc..60dee3ed 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -416,32 +416,12 @@ inline system::error_code check_config(const config& cfg) return system::error_code{}; } -inline system::error_code translate_parallel_group_errors( +system::error_code translate_parallel_group_errors( std::array order, system::error_code setup_ec, system::error_code health_check_ec, system::error_code reader_ec, - system::error_code writer_ec) -{ - // The setup request is special: it might complete successfully, - // without causing the other tasks to exit. - // The other tasks will always complete with an error. - - // If the setup task errored and was the first to exit, use its code - if (order[0] == 0u && setup_ec) { - return setup_ec; - } - - // Use the code corresponding to the task that finished first, - // excluding the setup task - std::size_t task_number = order[0] == 0u ? order[1] : order[0]; - switch (task_number) { - case 1u: return health_check_ec; - case 2u: return reader_ec; - case 3u: return writer_ec; - default: BOOST_ASSERT(false); return system::error_code(); - } -} + system::error_code writer_ec); template class run_op { diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 088b1ac3..9d8fe219 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -21,6 +21,33 @@ logger detail::make_stderr_logger(logger::level lvl, std::string prefix) }); } +inline system::error_code detail::translate_parallel_group_errors( + std::array order, + system::error_code setup_ec, + system::error_code health_check_ec, + system::error_code reader_ec, + system::error_code writer_ec) +{ + // The setup request is special: it might complete successfully, + // without causing the other tasks to exit. + // The other tasks will always complete with an error. + + // If the setup task errored and was the first to exit, use its code + if (order[0] == 0u && setup_ec) { + return setup_ec; + } + + // Use the code corresponding to the task that finished first, + // excluding the setup task + std::size_t task_number = order[0] == 0u ? order[1] : order[0]; + switch (task_number) { + case 1u: return health_check_ec; + case 2u: return reader_ec; + case 3u: return writer_ec; + default: BOOST_ASSERT(false); return system::error_code(); + } +} + connection::connection(executor_type ex, asio::ssl::context ctx, logger lgr) : impl_{std::move(ex), std::move(ctx), std::move(lgr)} { } From 38a3aaa55ee6300713b92bb6a6e44706b1235f83 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 16:26:22 +0200 Subject: [PATCH 42/43] Nte on lifetimes --- include/boost/redis/connection.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 60dee3ed..3d38b255 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -775,6 +775,11 @@ class basic_connection { * In both cases, cancellation is equivalent to calling @ref basic_connection::cancel * passing @ref operation::run as argument. * + * After the operation completes, the token's associated cancellation slot + * may still have a cancellation handler associated to this connection. + * You should make sure to not invoke it after the connection has been destroyed. + * This is consistent with what other Asio I/O objects do. + * * For example on how to call this function refer to * cpp20_intro.cpp or any other example. * From 2bdb8b61a9b36731f1f6f9ba2cc1a41161830835 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 3 Oct 2025 18:26:06 +0200 Subject: [PATCH 43/43] Remove erroneous inline keyword --- include/boost/redis/impl/connection.ipp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 9d8fe219..b23cc8b4 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -21,7 +21,7 @@ logger detail::make_stderr_logger(logger::level lvl, std::string prefix) }); } -inline system::error_code detail::translate_parallel_group_errors( +system::error_code detail::translate_parallel_group_errors( std::array order, system::error_code setup_ec, system::error_code health_check_ec,