Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
810da82
Initial impl
anarthal Sep 26, 2025
744a56e
Merge branch 'develop' into feature/async-run-cancellation
anarthal Sep 27, 2025
20e8f76
Remove connection cancellations from reader fsm
anarthal Sep 27, 2025
ac2b6cd
Proper handling of cancellation in reader_fsm
anarthal Sep 27, 2025
b81b80a
writer_op cancellations
anarthal Sep 27, 2025
0b620ef
Merge branch 'develop' into feature/async-run-cancellation
anarthal Oct 1, 2025
02166eb
Parallel group handling
anarthal Oct 1, 2025
856e2ff
Fix async_compose
anarthal Oct 1, 2025
f80e465
Remove action from logger
anarthal Oct 1, 2025
1d1be62
Fix test_reader_fsm
anarthal Oct 1, 2025
2392e30
Adjust cancel
anarthal Oct 1, 2025
33058f9
Fix problem with zero ping timeouts
anarthal Oct 1, 2025
690b2cd
Fix UNIX socket reconnection
anarthal Oct 1, 2025
860f53b
Remove race in UNIX socket test
anarthal Oct 1, 2025
87dba03
Fixed bug handling parallel group result
anarthal Oct 1, 2025
c76267d
Improve push test
anarthal Oct 1, 2025
efe0b93
Fix test_conn_exec_retry
anarthal Oct 2, 2025
d2f85be
Rename test
anarthal Oct 2, 2025
0ac54ab
Failing error code test
anarthal Oct 2, 2025
43bb37e
Proper error code translation in PING
anarthal Oct 2, 2025
7e79751
PING disabled test
anarthal Oct 2, 2025
74a2cec
Remove TODO in redis_stream
anarthal Oct 2, 2025
96e6691
Cleanup TLS reconnection test
anarthal Oct 2, 2025
7c1f98b
Proper per-operation/cancel function bridging
anarthal Oct 2, 2025
c6ba6b3
Test terminal run cancellation
anarthal Oct 2, 2025
c29525b
Test partial cancellation
anarthal Oct 2, 2025
eefc962
Docs
anarthal Oct 2, 2025
3928762
async_receive cancellation docs
anarthal Oct 2, 2025
52a12be
operation deprecations
anarthal Oct 3, 2025
2a50e2e
Adjust to docs
anarthal Oct 3, 2025
7fa8438
Minor doc corrections
anarthal Oct 3, 2025
286ddcb
Move FSM test fn
anarthal Oct 3, 2025
7aaacf0
Remove unused macros
anarthal Oct 3, 2025
4656f8d
Remove TODO comments
anarthal Oct 3, 2025
b603f0e
Remove setup cancellation action
anarthal Oct 3, 2025
cc95af0
Reorder
anarthal Oct 3, 2025
d8818dc
Cancel after read
anarthal Oct 3, 2025
cb23d2b
Push delivery cancel test
anarthal Oct 3, 2025
70f4b42
adjust yield numbers
anarthal Oct 3, 2025
b8cdb55
Remove unused parameter comment
anarthal Oct 3, 2025
974c00f
Fix incorrect piece of docs
anarthal Oct 3, 2025
39fab32
Typo
anarthal Oct 3, 2025
066926e
Move fn to ipp
anarthal Oct 3, 2025
38a3aaa
Nte on lifetimes
anarthal Oct 3, 2025
2bdb8b6
Remove erroneous inline keyword
anarthal Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 160 additions & 100 deletions include/boost/redis/connection.hpp

Large diffs are not rendered by default.

12 changes: 0 additions & 12 deletions include/boost/redis/detail/helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,6 @@ auto is_cancelled(T const& self)
return self.get_cancellation_state().cancelled() != asio::cancellation_type_t::none;
}

#define BOOST_REDIS_CHECK_OP0(X) \
if (ec || redis::detail::is_cancelled(self)) { \
X self.complete(!!ec ? ec : asio::error::operation_aborted); \
return; \
}

#define BOOST_REDIS_CHECK_OP1(X) \
if (ec || redis::detail::is_cancelled(self)) { \
X self.complete(!!ec ? ec : asio::error::operation_aborted, {}); \
return; \
}

} // namespace boost::redis::detail

#endif // BOOST_REDIS_HELPER_HPP
7 changes: 2 additions & 5 deletions include/boost/redis/detail/reader_fsm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ class reader_fsm {
struct action {
enum class type
{
setup_cancellation,
read_some,
needs_more,
notify_push_receiver,
cancel_run,
done,
};

type type_ = type::setup_cancellation;
type type_ = type::done;
std::size_t push_size_ = 0u;
system::error_code ec_ = {};
};
Expand All @@ -40,11 +38,10 @@ class reader_fsm {
action resume(
std::size_t bytes_read,
system::error_code ec,
asio::cancellation_type_t /*cancel_state*/);
asio::cancellation_type_t cancel_state);

private:
int resume_point_{0};
action action_after_resume_;
action::type next_read_type_ = action::type::read_some;
multiplexer* mpx_ = nullptr;
std::pair<consume_result, std::size_t> res_{consume_result::needs_more, 0u};
Expand Down
23 changes: 10 additions & 13 deletions include/boost/redis/detail/redis_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class redis_stream {

if (obj.transport_ == transport_type::unix_socket) {
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
// Discard any existing state
obj.unix_socket_.close(ec);

// Directly connect to the socket
BOOST_ASIO_CORO_YIELD
obj.unix_socket_.async_connect(
Expand All @@ -119,7 +122,9 @@ class redis_stream {
} else {
// ssl::stream doesn't support being re-used. If we're to use
// TLS and the stream has been used, re-create it.
// Must be done before anything else is done on the stream
// Must be done before anything else is done on the stream.
// Note that we don't need to close the socket here because
// range connect does it for us.
if (cfg->use_ssl && obj.ssl_stream_used_)
obj.reset_stream();

Expand Down Expand Up @@ -269,19 +274,11 @@ class redis_stream {
}
}

// Cleanup
// Cancels resolve operations. Resolve operations don't support per-operation
// cancellation, but resolvers have a cancel() function. Resolve operations are
// in general blocking and run in a separate thread. cancel() has effect only
// if the operation hasn't started yet. Still, trying is better than nothing
void cancel_resolve() { resolv_.cancel(); }

void close()
{
system::error_code ec;
if (stream_.next_layer().is_open())
stream_.next_layer().close(ec);
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
if (unix_socket_.is_open())
unix_socket_.close(ec);
#endif
}
};

} // namespace detail
Expand Down
27 changes: 27 additions & 0 deletions include/boost/redis/impl/connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,33 @@ logger detail::make_stderr_logger(logger::level lvl, std::string prefix)
});
}

system::error_code detail::translate_parallel_group_errors(
std::array<std::size_t, 4u> order,
system::error_code setup_ec,
system::error_code health_check_ec,
system::error_code reader_ec,
system::error_code writer_ec)
{
// The setup request is special: it might complete successfully,
// without causing the other tasks to exit.
// The other tasks will always complete with an error.

// If the setup task errored and was the first to exit, use its code
if (order[0] == 0u && setup_ec) {
return setup_ec;
}

// Use the code corresponding to the task that finished first,
// excluding the setup task
std::size_t task_number = order[0] == 0u ? order[1] : order[0];
switch (task_number) {
case 1u: return health_check_ec;
case 2u: return reader_ec;
case 3u: return writer_ec;
default: BOOST_ASSERT(false); return system::error_code();
}
}

connection::connection(executor_type ex, asio::ssl::context ctx, logger lgr)
: impl_{std::move(ex), std::move(ctx), std::move(lgr)}
{ }
Expand Down
2 changes: 0 additions & 2 deletions include/boost/redis/impl/connection_logger.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ namespace boost::redis::detail {
auto to_string(reader_fsm::action::type t) noexcept -> char const*
{
switch (t) {
BOOST_REDIS_READER_SWITCH_CASE(setup_cancellation);
BOOST_REDIS_READER_SWITCH_CASE(read_some);
BOOST_REDIS_READER_SWITCH_CASE(needs_more);
BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver);
BOOST_REDIS_READER_SWITCH_CASE(cancel_run);
BOOST_REDIS_READER_SWITCH_CASE(done);
default: return "action::type::<invalid type>";
}
Expand Down
47 changes: 31 additions & 16 deletions include/boost/redis/impl/reader_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,63 @@
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>

#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>

namespace boost::redis::detail {

inline bool is_terminal_cancel(asio::cancellation_type_t value)
{
return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none;
}

reader_fsm::reader_fsm(multiplexer& mpx) noexcept
: mpx_{&mpx}
{ }

reader_fsm::action reader_fsm::resume(
std::size_t bytes_read,
system::error_code ec,
asio::cancellation_type_t /*cancel_state*/)
asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
BOOST_REDIS_CORO_INITIAL
BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation)

for (;;) {
// Prepare the buffer for the read operation
ec = mpx_->prepare_read();
if (ec) {
action_after_resume_ = {action::type::done, 0, ec};
BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run)
return action_after_resume_;
return {action::type::done, 0, ec};
}

BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_)
// Read
BOOST_REDIS_YIELD(resume_point_, 1, next_read_type_)

// Process the bytes read, even if there was an error
mpx_->commit_read(bytes_read);

// Check for read errors
if (ec) {
// TODO: If an error occurred but data was read (i.e.
// bytes_read != 0) we should try to process that data and
// deliver it to the user before calling cancel_run.
action_after_resume_ = {action::type::done, bytes_read, ec};
BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run)
return action_after_resume_;
return {action::type::done, bytes_read, ec};
}

// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
return {action::type::done, 0u, asio::error::operation_aborted};
}

// Process the data that we've read
next_read_type_ = action::type::read_some;
while (mpx_->get_read_buffer_size() != 0) {
res_ = mpx_->consume(ec);

if (ec) {
// TODO: Perhaps log what has not been consumed to aid
// debugging.
action_after_resume_ = {action::type::done, res_.second, ec};
BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run)
return action_after_resume_;
return {action::type::done, res_.second, ec};
}

if (res_.first == consume_result::needs_more) {
Expand All @@ -59,11 +73,12 @@ reader_fsm::action reader_fsm::resume(
}

if (res_.first == consume_result::got_push) {
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
BOOST_REDIS_YIELD(resume_point_, 2, action::type::notify_push_receiver, res_.second)
if (ec) {
action_after_resume_ = {action::type::done, 0u, ec};
BOOST_REDIS_YIELD(resume_point_, 7, action::type::cancel_run)
return action_after_resume_;
return {action::type::done, 0u, ec};
}
if (is_terminal_cancel(cancel_state)) {
return {action::type::done, 0u, asio::error::operation_aborted};
}
} else {
// TODO: Here we should notify the exec operation that
Expand Down
56 changes: 51 additions & 5 deletions include/boost/redis/operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,68 @@ namespace boost::redis {
*/
enum class operation
{
/// Resolve operation.
/**
* @brief (Deprecated) Resolve operation.
*
* Cancelling a single resolve operation is probably not what you
* want, since there is no way to detect when a connection is performing name resolution.
* Use @ref operation::run to cancel the current @ref basic_connection::async_run operation,
* which includes name resolution.
*/
resolve,
/// Connect operation.

/**
* @brief (Deprecated) Connect operation.
*
* Cancelling a single connect operation is probably not what you
* want, since there is no way to detect when a connection is performing a connect operation.
* Use @ref operation::run to cancel the current @ref basic_connection::async_run operation,
* which includes connection establishment.
*/
connect,
/// SSL handshake operation.

/**
* @brief (Deprecated) SSL handshake operation.
*
* Cancelling a single connect operation is probably not what you
* want, since there is no way to detect when a connection is performing an SSL handshake.
* Use @ref operation::run to cancel the current @ref basic_connection::async_run operation,
* which includes the SSL handshake.
*/
ssl_handshake,

/// Refers to `connection::async_exec` operations.
exec,

/// Refers to `connection::async_run` operations.
run,

/// Refers to `connection::async_receive` operations.
receive,
/// Cancels reconnection.

/**
* @brief (Deprecated) Cancels reconnection.
*
* Cancelling reconnection doesn't really cancel anything.
* It will only prevent further connections attempts from being
* made once the current connection encounters an error.
*
* Use @ref operation::run to cancel the current @ref basic_connection::async_run operation,
* which includes reconnection. If you want to disable reconnection completely,
* set @ref config::reconnect_wait_interval to zero before calling `async_run`.
*/
reconnection,
/// Health check operation.

/**
* @brief (Deprecated) Health check operation.
*
* Cancelling the health checker only is probably not what you want.
* Use @ref operation::run to cancel the current @ref basic_connection::async_run operation,
* which includes the health checker. If you want to disable health checks completely,
* set @ref config::health_check_interval to zero before calling `async_run`.
*/
health_check,

/// Refers to all operations.
all,
};
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ make_test(test_conn_quit)
make_test(test_conn_exec_retry)
make_test(test_conn_exec_error)
make_test(test_run)
make_test(test_conn_run_cancel)
make_test(test_conn_check_health)
make_test(test_conn_exec)
make_test(test_conn_push)
Expand Down
Loading