diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index e85a48e8859a5..4d99ea5a54d63 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -318,6 +318,7 @@ redpanda_cc_library( "cluster_recovery_reconciler.cc", "cluster_recovery_table.cc", "cluster_utils.cc", + "config/cluster_config.cc", "config_frontend.cc", "config_manager.cc", "controller.cc", @@ -486,6 +487,8 @@ redpanda_cc_library( "cluster_utils.h", "cluster_uuid.h", "commands.h", + "config/cluster_config.h", + "config/config_i.h", "config_frontend.h", "config_manager.h", "controller.h", diff --git a/src/v/cluster/config/cluster_config.cc b/src/v/cluster/config/cluster_config.cc new file mode 100644 index 0000000000000..ddd3030eef677 --- /dev/null +++ b/src/v/cluster/config/cluster_config.cc @@ -0,0 +1,20 @@ + +#include "cluster/config/cluster_config.h" + +#include "cluster/config_manager.h" +#include "cluster/types.h" + +namespace cluster { +cluster_config::cluster_config( + ss::sharded& config_manager) noexcept + : _config_manager(config_manager) {} + +std::expected +cluster_config::get_config() const noexcept { + vassert( + ss::this_shard_id() == cluster::config_manager::shard, + "This operation can only be invoked on the config_manager shard"); + return _config_manager.local().get_version(); +} + +} // namespace cluster diff --git a/src/v/cluster/config/cluster_config.h b/src/v/cluster/config/cluster_config.h new file mode 100644 index 0000000000000..1ca1a1d1854f1 --- /dev/null +++ b/src/v/cluster/config/cluster_config.h @@ -0,0 +1,19 @@ + +#pragma once + +#include "cluster/config/config_i.h" +#include "cluster/config_manager.h" + +namespace cluster { + +class cluster_config : public config_i { +public: + explicit cluster_config( + ss::sharded& config_manager) noexcept; + std::expected + get_config() const noexcept override; + +private: + ss::sharded& _config_manager; +}; +} // namespace cluster diff --git a/src/v/cluster/config/config_i.h b/src/v/cluster/config/config_i.h new file mode 100644 index 0000000000000..aa04d91c6dd84 --- /dev/null +++ b/src/v/cluster/config/config_i.h @@ -0,0 +1,18 @@ + +#pragma once + +#include "cluster/types.h" + +#include + +namespace cluster { + +// interface for cluster configuration +class config_i { +public: + virtual std::expected + get_config() const noexcept = 0; + + virtual ~config_i() = default; +}; +} // namespace cluster diff --git a/src/v/cluster/config/test/BUILD b/src/v/cluster/config/test/BUILD new file mode 100644 index 0000000000000..8ff5fa2bcc955 --- /dev/null +++ b/src/v/cluster/config/test/BUILD @@ -0,0 +1,12 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +redpanda_cc_library( + name = "test_config", + hdrs = [ + "test_config.h", + ], + visibility = ["//visibility:public"], + deps = [ + "//src/v/cluster", + ], +) diff --git a/src/v/cluster/config/test/test_config.h b/src/v/cluster/config/test/test_config.h new file mode 100644 index 0000000000000..1f8df92fc55a1 --- /dev/null +++ b/src/v/cluster/config/test/test_config.h @@ -0,0 +1,31 @@ + +#pragma once + +#include "cluster/config/config_i.h" +#include "cluster/types.h" + +namespace cluster::test { +class test_config : public config_i { +public: + using functor_t + = std::function()>; + + explicit test_config(functor_t function) noexcept + : _function(std::move(function)) {} + + std::expected + get_config() const noexcept override { + return _function(); + } + + static inline std::unique_ptr make_default() { + return std::make_unique([] { + return std::expected{ + config_version{0}}; + }); + } + +private: + functor_t _function; +}; +} // namespace cluster::test diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index e2ee1685514ce..040c8b7ce5e52 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -27,6 +27,7 @@ #include "cluster/cluster_link/table.h" #include "cluster/cluster_recovery_table.h" #include "cluster/cluster_utils.h" +#include "cluster/config/cluster_config.h" #include "cluster/config_frontend.h" #include "cluster/controller_api.h" #include "cluster/controller_backend.h" @@ -227,7 +228,10 @@ ss::future<> controller::wire_up() { std::ref(_tp_state), std::ref(_members_table), std::ref(_partition_allocator), - std::ref(_node_status_table)); + std::ref(_node_status_table), + ss::sharded_parameter([this] { + return std::make_unique(_config_manager); + })); }) .then([this] { return _shard_placement.start( @@ -727,7 +731,11 @@ ss::future<> controller::start( std::ref(_drain_manager), std::ref(_feature_table), std::ref(_partition_leaders), - std::ref(_tp_state)); + std::ref(_tp_state), + std::ref(_node_status_table), + ss::sharded_parameter([this]() { + return std::make_unique(_config_manager); + })); _leader_balancer = std::make_unique( _tp_state.local(), @@ -798,6 +806,8 @@ ss::future<> controller::start( std::ref(_members_frontend), config::shard_local_cfg() .partition_autobalancing_node_availability_timeout_sec.bind(), + config::shard_local_cfg() + .partition_autobalancing_node_autodecommission_time.bind(), config::shard_local_cfg() .partition_autobalancing_max_disk_usage_percent.bind(), config::shard_local_cfg().partition_autobalancing_tick_interval_ms.bind(), diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 20414e105ef22..a2873ad8373a8 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -20,8 +20,10 @@ #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/node/local_monitor.h" +#include "cluster/node_status_table.h" #include "cluster/partition_manager.h" #include "cluster/partition_probe.h" +#include "cluster/types.h" #include "config/configuration.h" #include "config/property.h" #include "container/chunked_hash_map.h" @@ -47,6 +49,7 @@ #include #include +#include #include #include #include @@ -67,7 +70,9 @@ health_monitor_backend::health_monitor_backend( ss::sharded& drain_manager, ss::sharded& feature_table, ss::sharded& partition_leaders_table, - ss::sharded& topic_table) + ss::sharded& topic_table, + ss::sharded& node_status_table, + std::unique_ptr config) : _raft0(std::move(raft0)) , _members(mt) , _connections(connections) @@ -78,6 +83,8 @@ health_monitor_backend::health_monitor_backend( , _feature_table(feature_table) , _partition_leaders_table(partition_leaders_table) , _topic_table(topic_table) + , _node_status_table(node_status_table) + , _config(std::move(config)) , _reports{ss::make_lw_shared()} , _local_monitor(local_monitor) , _self(_raft0->self().id()) {} @@ -199,7 +206,11 @@ std::optional health_monitor_backend::build_node_report( } node_health_report ret{ - it->second->id, it->second->local_state, {}, it->second->drain_status}; + it->second->id, + it->second->local_state, + {}, + it->second->drain_status, + it->second->maybe_auto_decommission_status}; ret.local_state.logical_version = features::feature_table::get_latest_logical_version(); ret.topics = filter_topic_status(it->second->topics, f.ntp_filters); @@ -888,13 +899,18 @@ health_monitor_backend::collect_current_node_health() { auto drain_status = co_await _drain_manager.local().status(); auto topics = co_await collect_topic_status(); + auto maybe_auto_decommission_status = collect_auto_decommission_status(); auto [it, _] = _status.try_emplace(id); it->second.is_alive = alive::yes; it->second.last_reply_timestamp = ss::lowres_clock::now(); co_return node_health_report{ - id, std::move(local_state), std::move(topics), std::move(drain_status)}; + id, + std::move(local_state), + std::move(topics), + drain_status, + std::move(maybe_auto_decommission_status)}; } ss::future> health_monitor_backend::get_current_node_health() { @@ -1024,6 +1040,7 @@ reports_acc_t reduce_reports_map(reports_acc_t acc, shard_report shard_report) { return acc; } } // namespace + ss::future> health_monitor_backend::collect_topic_status() { auto reports_map = co_await _partition_manager.map_reduce0( @@ -1040,6 +1057,76 @@ health_monitor_backend::collect_topic_status() { co_return topics; } +namespace { +// small helper for getting the boot time of the redpanda application +ss::lowres_clock::time_point get_boot_time() { + return rpc::clock_type::now() + - std::chrono::duration_cast( + ss::engine().uptime()); +} +} // namespace + +std::optional +health_monitor_backend::collect_auto_decommission_status() { + const auto timeout_duration + = config::shard_local_cfg() + .partition_autobalancing_node_autodecommission_time(); + + auto maybe_config_version = _config->get_config(); + vassert( + maybe_config_version, + "current implementation of config_i::get_config should never return an " + "error"); + auto config_version = *maybe_config_version; + // bounce to static method + return do_collect_auto_decommission_status( + {.timeout_duration = timeout_duration, + .now = rpc::clock_type::now(), + .default_last_seen = get_boot_time(), + .nodes = _members.local().node_ids(), + .node_status_getter = get_node_status_t{[this](model::node_id node_id) { + return _node_status_table.local().get_node_status(node_id); + }}, + .current_config_version = config_version}); +} + +std::optional +health_monitor_backend::do_collect_auto_decommission_status( + const do_collect_auto_decommission_status_params& params) { + // gather the list of all nodes past the configured auto decom time + // package that with the current configuration version s.t. a decision is + // guaranteed to be made with the same decom timeout + + auto all_past_last_seen + = params.nodes | // get last seen, if missing clock min + std::ranges::views::transform( + [¶ms](const model::node_id node) -> node_status { + return params.node_status_getter(node).value_or( + node_status{ + .node_id = node, .last_seen = params.default_last_seen}); + }) + | // filter to node statuses that are above timeout duration + std::ranges::views::filter([¶ms](const node_status status) { + return params.now - status.last_seen > params.timeout_duration; + }) + | // grab the node ids + std::ranges::views::transform( + [](const node_status status) { return status.node_id; }) + | // to vector + std::ranges::to>(); + + if (all_past_last_seen.empty()) [[likely]] { + // the overwhelming majority of the time the list should be empty + return std::nullopt; + } + + auto_decommission_status status{ + {.configuration_version = params.current_config_version, + .nodes_past_auto_decom_timeout = std::move(all_past_last_seen)}}; + + return status; +} + std::chrono::milliseconds health_monitor_backend::max_metadata_age() { return config::shard_local_cfg().health_monitor_max_metadata_age(); } diff --git a/src/v/cluster/health_monitor_backend.h b/src/v/cluster/health_monitor_backend.h index ddce0682c5853..5f73cfdaca94e 100644 --- a/src/v/cluster/health_monitor_backend.h +++ b/src/v/cluster/health_monitor_backend.h @@ -11,11 +11,14 @@ */ #pragma once #include "absl/container/node_hash_map.h" +#include "cluster/config/config_i.h" #include "cluster/fwd.h" #include "cluster/health_monitor_types.h" #include "cluster/node/local_monitor.h" +#include "cluster/node_status_table.h" #include "cluster/notification.h" #include "features/feature_table.h" +#include "model/fundamental.h" #include "model/metadata.h" #include "rpc/fwd.h" #include "ssx/semaphore.h" @@ -71,7 +74,9 @@ class health_monitor_backend { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&, + std::unique_ptr); ss::future<> stop(); @@ -165,6 +170,22 @@ class health_monitor_backend { ss::future> collect_topic_status(); + // get the status info of all nodes which are past auto decommission timeout + std::optional collect_auto_decommission_status(); + using get_node_status_t + = ss::noncopyable_function(model::node_id)>; + struct do_collect_auto_decommission_status_params { + std::chrono::seconds timeout_duration; + rpc::clock_type::time_point now; + rpc::clock_type::time_point default_last_seen; + std::vector nodes; + get_node_status_t node_status_getter; + config_version current_config_version; + }; + static std::optional + do_collect_auto_decommission_status( + const do_collect_auto_decommission_status_params& params); + result process_node_reply(model::node_id, result); @@ -232,6 +253,8 @@ class health_monitor_backend { ss::sharded& _feature_table; ss::sharded& _partition_leaders_table; ss::sharded& _topic_table; + ss::sharded& _node_status_table; + std::unique_ptr _config; ss::lowres_clock::time_point _last_refresh; ss::lw_shared_ptr _refresh_request; diff --git a/src/v/cluster/health_monitor_types.cc b/src/v/cluster/health_monitor_types.cc index d67917a52fb37..641a777d5ceff 100644 --- a/src/v/cluster/health_monitor_types.cc +++ b/src/v/cluster/health_monitor_types.cc @@ -75,14 +75,32 @@ std::ostream& operator<<(std::ostream& o, const node_state& s) { return o; } +std::ostream& operator<<(std::ostream& o, const auto_decommission_status& ads) { + fmt::print( + o, + "{{config_version: {}, nodes_past_auto_decom_timeout: {}}}", + ads.configuration_version, + ads.nodes_past_auto_decom_timeout); + return o; +} + +bool operator==( + const auto_decommission_status& a, const auto_decommission_status& b) { + return a.configuration_version == b.configuration_version + && std::ranges::equal( + a.nodes_past_auto_decom_timeout, b.nodes_past_auto_decom_timeout); +} + node_health_report::node_health_report( model::node_id id, node::local_state local_state, chunked_vector topics_vec, - std::optional drain_status) + std::optional drain_status, + std::optional maybe_auto_decommission_status) : id(id) , local_state(std::move(local_state)) - , drain_status(drain_status) { + , drain_status(drain_status) + , maybe_auto_decommission_status(std::move(maybe_auto_decommission_status)) { topics.reserve(topics_vec.size()); for (auto& topic : topics_vec) { topics.emplace( @@ -91,7 +109,8 @@ node_health_report::node_health_report( } node_health_report node_health_report::copy() const { - node_health_report ret{id, local_state, {}, drain_status}; + node_health_report ret{ + id, local_state, {}, drain_status, maybe_auto_decommission_status}; ret.topics.reserve(topics.bucket_count()); for (const auto& [tp_ns, partitions] : topics) { ret.topics.emplace(tp_ns, copy_partition_statuses(partitions)); @@ -104,7 +123,12 @@ std::ostream& operator<<(std::ostream& o, const node_health_report& r) { } node_health_report_serde::node_health_report_serde(const node_health_report& hr) - : node_health_report_serde(hr.id, hr.local_state, {}, hr.drain_status) { + : node_health_report_serde( + hr.id, + hr.local_state, + /* topics */ {}, + hr.drain_status, + hr.maybe_auto_decommission_status) { topics.reserve(hr.topics.size()); for (const auto& [tp_ns, partitions] : hr.topics) { topics.emplace_back(tp_ns, copy_to_vector(partitions)); @@ -154,11 +178,13 @@ partition_statuses_map_t copy_to_map(const partition_statuses_t& ps_vec) { std::ostream& operator<<(std::ostream& o, const node_health_report_serde& r) { fmt::print( o, - "{{id: {}, topics: {}, local_state: {}, drain_status: {}}}", + "{{id: {}, topics: {}, local_state: {}, drain_status: {}, " + "maybe_auto_decommission_status {}}}", r.id, r.topics, r.local_state, - r.drain_status); + r.drain_status, + r.maybe_auto_decommission_status); return o; } @@ -171,7 +197,9 @@ bool operator==( a.topics.cbegin(), a.topics.cend(), b.topics.cbegin(), - b.topics.cend()); + b.topics.cend()) + && a.maybe_auto_decommission_status + == b.maybe_auto_decommission_status; } std::ostream& operator<<(std::ostream& o, const cluster_health_report& r) { diff --git a/src/v/cluster/health_monitor_types.h b/src/v/cluster/health_monitor_types.h index ddcedda94eccb..faf0d8e557ee8 100644 --- a/src/v/cluster/health_monitor_types.h +++ b/src/v/cluster/health_monitor_types.h @@ -15,6 +15,7 @@ #include "cluster/drain_manager.h" #include "cluster/errc.h" #include "cluster/node/types.h" +#include "cluster/types.h" #include "container/chunked_hash_map.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -196,6 +197,47 @@ struct topic_status auto serde_fields() { return std::tie(tp_ns, partitions); } }; +/** + * Status for the automatic decommissioning of dead nodes + */ + +struct auto_decommission_status_data { + // ensures that the auto decommission configuration is the same on the + // source node and controller + config_version configuration_version{config_version_unset}; + // a list of the node_ids which have exceeded the auto decommission timeout + std::vector nodes_past_auto_decom_timeout; +}; + +struct auto_decommission_status + : serde::envelope< + auto_decommission_status, + serde::version<0>, + serde::compat_version<0>> + , auto_decommission_status_data { + static constexpr int8_t current_version = 0; + + using auto_decommission_status_data::auto_decommission_status_data; + + // NOLINTNEXTLINE hicpp-explicit-conversions + auto_decommission_status(auto_decommission_status_data data) noexcept + : auto_decommission_status_data{std::move(data)} { + static_assert( + sizeof(auto_decommission_status_data) + == sizeof(auto_decommission_status)); + } + + friend std::ostream& + operator<<(std::ostream&, const auto_decommission_status&); + + auto serde_fields() { + return std::tie(configuration_version, nodes_past_auto_decom_timeout); + } + + friend bool operator==( + const auto_decommission_status& a, const auto_decommission_status& b); +}; + /** * Node health report is collected built based on node local state at given * instance of time @@ -211,12 +253,14 @@ struct node_health_report { node::local_state local_state; topics_t topics; std::optional drain_status; + std::optional maybe_auto_decommission_status; node_health_report( model::node_id, node::local_state, chunked_vector, - std::optional); + std::optional, + std::optional); node_health_report copy() const; @@ -240,9 +284,15 @@ struct node_health_report_serde node::local_state local_state; chunked_vector topics; std::optional drain_status; + std::optional maybe_auto_decommission_status; auto serde_fields() { - return std::tie(id, local_state, topics, drain_status); + return std::tie( + id, + local_state, + topics, + drain_status, + maybe_auto_decommission_status); } node_health_report_serde() = default; @@ -251,14 +301,22 @@ struct node_health_report_serde model::node_id id, node::local_state local_state, chunked_vector topics, - std::optional drain_status) + std::optional drain_status, + std::optional maybe_auto_decommission_status) : id(id) , local_state(std::move(local_state)) , topics(std::move(topics)) - , drain_status(drain_status) {} + , drain_status(drain_status) + , maybe_auto_decommission_status( + std::move(maybe_auto_decommission_status)) {} node_health_report_serde copy() const { - return {id, local_state, topics.copy(), drain_status}; + return { + id, + local_state, + topics.copy(), + drain_status, + maybe_auto_decommission_status}; } explicit node_health_report_serde(const node_health_report& hr); @@ -268,7 +326,8 @@ struct node_health_report_serde id, std::move(local_state), std::move(topics), - std::move(drain_status)}; + drain_status, + std::move(maybe_auto_decommission_status)}; } friend std::ostream& diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 9c8c9e1d24b2e..aa60b0004ae6d 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -54,6 +54,7 @@ partition_balancer_backend::partition_balancer_backend( ss::sharded& topics_frontend, ss::sharded& members_frontend, config::binding&& availability_timeout, + config::binding&& decommission_timeout, config::binding max_disk_usage_percent, config::binding&& tick_interval, config::binding&& max_concurrent_actions, @@ -76,6 +77,7 @@ partition_balancer_backend::partition_balancer_backend( features::license_required_feature:: partition_auto_balancing_continuous>()) , _availability_timeout(std::move(availability_timeout)) + , _decommission_timeout(std::move(decommission_timeout)) , _max_disk_usage_percent(std::move(max_disk_usage_percent)) , _tick_interval(std::move(tick_interval)) , _max_concurrent_actions(std::move(max_concurrent_actions)) @@ -448,7 +450,8 @@ ss::future<> partition_balancer_backend::do_tick() { "violations: unavailable nodes: {}, full nodes: {}; " "nodes to rebalance count: {}; on demand rebalance requested: {}; " "updates in progress: {}; " - "action counts: reassignments: {}, cancellations: {}, failed: {}; " + "action counts: reassignments: {}, cancellations: {}, nodes to " + "decommission: {}, failed: {}; " "counts rebalancing finished: {}, force refresh health report: {}", _cur_term->last_status, _cur_term->last_violations.unavailable_nodes.size(), @@ -458,6 +461,7 @@ ss::future<> partition_balancer_backend::do_tick() { _state.topics().updates_in_progress().size(), plan_data.reassignments.size(), plan_data.cancellations.size(), + plan_data.decommissions.size(), plan_data.failed_actions_count, plan_data.counts_rebalancing_finished, _cur_term->_force_health_report_refresh); @@ -549,6 +553,24 @@ ss::future<> partition_balancer_backend::do_tick() { _cur_term->last_tick_in_progress_updates = moves_before + plan_data.cancellations.size() + plan_data.reassignments.size(); + + for (auto node_to_decommission : plan_data.decommissions) { + vlog( + clusterlog.info, + "submitting decommission on unresponsive node: {}", + node_to_decommission); + + auto decom_error = co_await _members_frontend.decommission_node( + node_to_decommission); + + if (decom_error) { + vlog( + clusterlog.warn, + "node: {}, failed to decommission with error: {}", + node_to_decommission, + decom_error); + } + } } partition_balancer_overview_reply partition_balancer_backend::overview() const { diff --git a/src/v/cluster/partition_balancer_backend.h b/src/v/cluster/partition_balancer_backend.h index 59d82c4618765..0279492de9ef5 100644 --- a/src/v/cluster/partition_balancer_backend.h +++ b/src/v/cluster/partition_balancer_backend.h @@ -43,6 +43,7 @@ class partition_balancer_backend { ss::sharded&, ss::sharded&, config::binding&& availability_timeout, + config::binding&& decommission_timeout, config::binding max_disk_usage_percent, config::binding&& tick_interval, config::binding&& max_concurrent_actions, @@ -102,6 +103,7 @@ class partition_balancer_backend { config::enum_property> _mode; config::binding _availability_timeout; + config::binding _decommission_timeout; config::binding _max_disk_usage_percent; config::binding _tick_interval; config::binding _max_concurrent_actions; diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index e0e508d9c3ddd..326c524a8ded7 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -12,6 +12,7 @@ #include "base/vlog.h" #include "cluster/cluster_utils.h" +#include "cluster/health_monitor_types.h" #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/node_status_table.h" @@ -23,6 +24,8 @@ #include "cluster/scheduling/types.h" #include "cluster/types.h" #include "container/chunked_hash_map.h" +#include "model/fundamental.h" +#include "model/metadata.h" #include "model/namespace.h" #include "random/generators.h" #include "ssx/sformat.h" @@ -33,6 +36,8 @@ #include #include +#include +#include namespace cluster { @@ -109,6 +114,7 @@ class partition_balancer_planner::request_context { absl::flat_hash_set all_unavailable_nodes; absl::flat_hash_set timed_out_unavailable_nodes; absl::flat_hash_set decommissioning_nodes; + absl::flat_hash_set maintenance_mode_nodes; absl::flat_hash_map node_disk_reports; ss::future<> for_each_partition( @@ -260,6 +266,9 @@ class partition_balancer_planner::request_context { _topic2node_counts; absl::node_hash_map _reassignments; absl::node_hash_map _force_reassignments; + // nodes to request to decommission, not nodes which are currently + // decommissioning + absl::flat_hash_set _nodes_to_be_decommed; size_t _failed_actions_count = 0; // we track missing partition size info separately as it requires force // refresh of health report @@ -323,6 +332,7 @@ void partition_balancer_planner::init_per_node_state( request_context& ctx, plan_data& result) { const auto now = rpc::clock_type::now(); + for (const auto& [id, broker] : _state.members().nodes()) { if ( broker.state.get_membership_state() @@ -338,6 +348,13 @@ void partition_balancer_planner::init_per_node_state( vlog(clusterlog.debug, "node {}: decommissioning", id); ctx.decommissioning_nodes.insert(id); } + + if ( + broker.state.get_maintenance_state() + == model::maintenance_state::active) { + ctx.maintenance_mode_nodes.insert(id); + } + auto node_status = _state.node_status().get_node_status(id); // node status is not yet available, wait for it to be updated if (!node_status) { @@ -2139,6 +2156,124 @@ partition_balancer_planner::get_force_repair_actions(request_context& ctx) { } } +absl::flat_hash_set +partition_balancer_planner::do_get_auto_decommission_actions( + const do_get_auto_decommission_actions_params& params) noexcept { + // rule: a quorum of nodes in the cluster must vote that a node has passed + // the current auto decommission timeout + size_t quorum = params.cluster_members.size() / 2 + 1; + + // given a map from the health report on node_id -> decom_report, filter to + // 1. only cluster members + // 2. only reports that have a value + // 3. only reports that have the current configuration version + // - this checks that we are working with the current decommission + // timeout + auto decom_reports + = params.node_to_decom + | std::ranges::views::filter( + [¶ms](const auto& node_and_decom_report) { + return + // only cluster members + params.cluster_members.contains(node_and_decom_report.first) + // only reports with values + && node_and_decom_report.second.get().has_value() + // only reports with the current configuration version + && node_and_decom_report.second.get() + .value() + .configuration_version + == params.current_config; + }) + | // quick type convert from ptr> -> ref_wrapper + std::ranges::views::transform( + [](const auto& node_and_decom_report) + -> std::reference_wrapper { + return node_and_decom_report.second.get().value(); + }) + | std::ranges::to>>(); + + if (decom_reports.size() < quorum) { + // not enough health reports to continue + return {}; + } + + // bin the filtered reports on decom vote count by node_id + std::unordered_map node_to_decom_vote_counts{}; + for (const auto& decom_report : decom_reports) { + for (const model::node_id candidate_node : + decom_report.get().nodes_past_auto_decom_timeout) { + node_to_decom_vote_counts[candidate_node] += 1; + } + } + + // filter out invalid choices which are + // 1. nodes without enough votes + // 2. nodes that are already decommissioning + // 3. nodes that are in maintanence mode + // - if a customer has to take a node down for over the auto decom + // timeout and doesn't want it ejected from the cluster, they should + // put it in maintanence mode + auto nodes_to_decommission + = node_to_decom_vote_counts | // filter invalid choices + std::ranges::views::filter( + [quorum, ¶ms](const auto& node_and_count) { + const auto& [node_id, decom_vote_count] = node_and_count; + return + // no nodes with less than a quorum of votes + decom_vote_count >= quorum + // no nodes that are already decommissioning + && !params.decommissioning_nodes.contains(node_id) + // no nodes in maintenance_mode + && !params.maintenance_mode_nodes.contains(node_id); + }) + | // shave it down to only node id + std::ranges::views::transform( + [](const auto& node_and_count) { return node_and_count.first; }) + | std::ranges::to>(); + + return nodes_to_decommission; +} + +void partition_balancer_planner::get_auto_decommission_actions( + request_context& ctx, const cluster_health_report& health_report) { + absl::flat_hash_set member_nodes{}; + for (auto node : ctx.all_nodes) { + member_nodes.insert(node); + } + + auto_decom_ref_map node_to_decom{}; + for (const auto& report : health_report.node_reports) { + vlog( + clusterlog.debug, + "auto decom report from node: {}, with decom status: {}", + report->id, + report->maybe_auto_decommission_status); + node_to_decom.emplace( + report->id, report->maybe_auto_decommission_status); + } + + ctx._nodes_to_be_decommed = do_get_auto_decommission_actions( + {.node_to_decom = node_to_decom, + .cluster_members = member_nodes, + .decommissioning_nodes = ctx.decommissioning_nodes, + .maintenance_mode_nodes = ctx.maintenance_mode_nodes, + .current_config = ctx.state().get_config_version()}); + + if (ctx._nodes_to_be_decommed.empty()) { + vlog(clusterlog.debug, "no auto decommission actions found"); + return; + } + + ss::sstring auto_decommission_log_line + = "found nodes to auto decommission: "; + for (auto node_id : ctx._nodes_to_be_decommed) { + auto_decommission_log_line += fmt::format("{}, ", node_id); + } + + vlog(clusterlog.info, "{}", auto_decommission_log_line); +} + void partition_balancer_planner::request_context::collect_actions( partition_balancer_planner::plan_data& result) { result.reassignments.reserve(_reassignments.size()); @@ -2171,9 +2306,14 @@ void partition_balancer_planner::request_context::collect_actions( result.reallocation_failures = std::move(_reallocation_failures); + result.decommissions.reserve(_nodes_to_be_decommed.size()); + std::ranges::move( + std::move(_nodes_to_be_decommed), + std::back_inserter(result.decommissions)); + if ( !result.cancellations.empty() || !result.reassignments.empty() - || result.counts_rebalancing_finished) { + || result.counts_rebalancing_finished || !result.decommissions.empty()) { result.status = status::actions_planned; } } @@ -2191,12 +2331,18 @@ partition_balancer_planner::plan_actions( co_return result; } + if (ctx.config().mode == model::partition_autobalancing_mode::continuous) { + get_auto_decommission_actions(ctx, health_report); + } + + // early exit if theres nothing to be done if ( result.violations.is_empty() && ctx.decommissioning_nodes.empty() && _state.ntps_with_broken_rack_constraint().empty() && _state.nodes_to_rebalance().empty() && _state.topics().partitions_to_force_recover().empty() - && !_config.ondemand_rebalance_requested) { + && !_config.ondemand_rebalance_requested + && ctx._nodes_to_be_decommed.empty()) { result.status = status::empty; co_return result; } diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index 1841478937ad2..e23afc73c47ab 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -22,6 +22,7 @@ #include namespace cluster { +class partition_balancer_planner_accessor; enum ntp_reassignment_type : int8_t { regular, force }; @@ -42,7 +43,12 @@ struct planner_config { double max_disk_usage_ratio; // Max number of actions that can be scheduled in one planning iteration size_t max_concurrent_actions; + // If a node is unresponsive for more than node_availability_timeout_sec, + // begin moving partitions off of that node std::chrono::seconds node_availability_timeout_sec; + // If a node is unresponsive for more than decommission timeout, launch a + // decommission operation against it + std::chrono::seconds decommission_timeout; // If the user manually requested a rebalance (not connected to node // addition) bool ondemand_rebalance_requested = false; @@ -86,6 +92,7 @@ class partition_balancer_planner { chunked_vector cancellations; chunked_hash_map reallocation_failures; + chunked_vector decommissions; bool counts_rebalancing_finished = false; size_t failed_actions_count = 0; status status = status::empty; @@ -123,13 +130,31 @@ class partition_balancer_planner { static ss::future<> get_full_node_actions(request_context&); static ss::future<> get_counts_rebalancing_actions(request_context&); static ss::future<> get_force_repair_actions(request_context&); + static void get_auto_decommission_actions( + request_context&, const cluster_health_report& health_report); static size_t calculate_full_disk_partition_move_priority( model::node_id, const reassignable_partition&, const request_context&); + using auto_decom_ref + = std::reference_wrapper>; + using auto_decom_ref_map + = absl::flat_hash_map; + struct do_get_auto_decommission_actions_params { + auto_decom_ref_map node_to_decom; + absl::flat_hash_set cluster_members; + absl::flat_hash_set decommissioning_nodes; + absl::flat_hash_set maintenance_mode_nodes; + config_version current_config; + }; + static absl::flat_hash_set do_get_auto_decommission_actions( + const do_get_auto_decommission_actions_params& params) noexcept; + planner_config _config; partition_balancer_state& _state; partition_allocator& _partition_allocator; + + friend class ::cluster::partition_balancer_planner_accessor; }; } // namespace cluster diff --git a/src/v/cluster/partition_balancer_state.cc b/src/v/cluster/partition_balancer_state.cc index f5931fbcaf5ad..adc3ec9c18af5 100644 --- a/src/v/cluster/partition_balancer_state.cc +++ b/src/v/cluster/partition_balancer_state.cc @@ -29,11 +29,13 @@ partition_balancer_state::partition_balancer_state( ss::sharded& topic_table, ss::sharded& members_table, ss::sharded& pa, - ss::sharded& nst) + ss::sharded& nst, + std::unique_ptr config) : _topic_table(topic_table.local()) , _members_table(members_table.local()) , _partition_allocator(pa.local()) , _node_status(nst.local()) + , _config(std::move(config)) , _probe(*this) {} void partition_balancer_state::handle_ntp_move_begin_or_cancel( @@ -163,4 +165,16 @@ void partition_balancer_state::probe::setup_metrics( }); } +config_version partition_balancer_state::get_config_version() const { + auto maybe_config = _config->get_config(); + if (!maybe_config) { + vlog( + clusterlog.error, + "failed to get config version with error: {}", + maybe_config.error()); + } + vassert(maybe_config, "config version should not return an error"); + return *maybe_config; +} + } // namespace cluster diff --git a/src/v/cluster/partition_balancer_state.h b/src/v/cluster/partition_balancer_state.h index 89c0a25c64aa8..d95afa21ae308 100644 --- a/src/v/cluster/partition_balancer_state.h +++ b/src/v/cluster/partition_balancer_state.h @@ -12,7 +12,9 @@ #include "absl/container/btree_set.h" #include "absl/container/flat_hash_set.h" +#include "cluster/config/config_i.h" #include "cluster/fwd.h" +#include "cluster/types.h" #include "metrics/metrics.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -20,6 +22,8 @@ #include +#include + namespace cluster { /// Class that stores state that is needed for functioning of the partition @@ -31,7 +35,8 @@ class partition_balancer_state { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + std::unique_ptr); topic_table& topics() const { return _topic_table; } @@ -86,6 +91,8 @@ class partition_balancer_state { ss::future<> apply_snapshot(const controller_snapshot&); + config_version get_config_version() const; + private: struct probe { explicit probe(const partition_balancer_state&); @@ -102,6 +109,7 @@ class partition_balancer_state { members_table& _members_table; partition_allocator& _partition_allocator; node_status_table& _node_status; + std::unique_ptr _config; absl::btree_set _ntps_with_broken_rack_constraint; // revision increment to be paired with all updates // _ntps_with_broken_rack_constraint set. Relied upon by the iterator. diff --git a/src/v/cluster/tests/BUILD b/src/v/cluster/tests/BUILD index 779cf443599e4..42b7e99235a7b 100644 --- a/src/v/cluster/tests/BUILD +++ b/src/v/cluster/tests/BUILD @@ -68,6 +68,7 @@ redpanda_test_cc_library( ":utils", "//src/v/base", "//src/v/cluster", + "//src/v/cluster/config/test:test_config", "//src/v/config", "//src/v/features", "//src/v/model", @@ -149,6 +150,7 @@ redpanda_test_cc_library( ":utils", "//src/v/base", "//src/v/cluster", + "//src/v/cluster/config/test:test_config", "@boost//:test", ], ) @@ -1314,3 +1316,40 @@ redpanda_cc_gtest( "@googletest//:gtest", ], ) + +redpanda_cc_gtest( + name = "partition_balancer_autodecom_test", + timeout = "short", + srcs = [ + "partition_balancer_autodecom_test.cc", + ], + deps = [ + "//src/v/cluster", + "//src/v/config", + "//src/v/container:chunked_vector", + "//src/v/model", + "//src/v/random:generators", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) + +redpanda_cc_gtest( + name = "health_monitor_autodecom_test", + timeout = "short", + srcs = [ + "health_monitor_autodecom_test.cc", + ], + deps = [ + "//src/v/cluster", + "//src/v/config", + "//src/v/container:chunked_vector", + "//src/v/model", + "//src/v/random:generators", + "//src/v/rpc", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cluster/tests/health_bench.cc b/src/v/cluster/tests/health_bench.cc index 0fbfbeecc57f5..57672439bfbb8 100644 --- a/src/v/cluster/tests/health_bench.cc +++ b/src/v/cluster/tests/health_bench.cc @@ -102,7 +102,11 @@ struct health_bench : health_report_accessor { for (auto& [node_id, topics] : node_topics) { reports[node_id] = ss::make_lw_shared( - node_id, node::local_state{}, std::move(topics), std::nullopt); + node_id, + node::local_state{}, + std::move(topics), + /* drain_status */ std::nullopt, + /* auto_decommission_status */ std::nullopt); } perf_tests::start_measuring_time(); diff --git a/src/v/cluster/tests/health_monitor_autodecom_test.cc b/src/v/cluster/tests/health_monitor_autodecom_test.cc new file mode 100644 index 0000000000000..ffc1b67bca9c6 --- /dev/null +++ b/src/v/cluster/tests/health_monitor_autodecom_test.cc @@ -0,0 +1,279 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/health_monitor_backend.h" +#include "cluster/health_monitor_types.h" +#include "model/fundamental.h" +#include "rpc/types.h" + +#include + +#include +#include + +namespace cluster { + +struct health_report_accessor { + using params + = health_monitor_backend::do_collect_auto_decommission_status_params; + + static constexpr auto now = rpc::clock_type::time_point{72h}; + static constexpr auto auto_decom_timeout = 24h; + static constexpr auto timed_out_timepoint = now - auto_decom_timeout - 1h; + static constexpr auto not_timed_out_timepoint = now - 12h; + static constexpr auto boot_time_zero = rpc::clock_type::time_point{0h}; + static constexpr auto boot_time_recent = rpc::clock_type::time_point{ + not_timed_out_timepoint - 2h}; + + static params get_default_params() { + return { + .timeout_duration = auto_decom_timeout, + .now = now, + .default_last_seen = boot_time_zero, + .nodes = {}, + .node_status_getter = + [](model::node_id node_id) { + return node_status{.node_id = node_id, .last_seen = now}; + }, + .current_config_version = config_version{0} + + }; + } + + static health_monitor_backend::get_node_status_t getter_from_map( + std::unordered_map + last_seen_map) { + auto node_status_map + = std::move(last_seen_map) + | std::ranges::views::transform([](auto node_and_last_seen) { + return std::pair{ + node_and_last_seen.first, + node_status{ + .node_id = node_and_last_seen.first, + .last_seen = node_and_last_seen.second}}; + }) + | std::ranges::to< + std::unordered_map>(); + + return [node_status_map = std::move(node_status_map)]( + model::node_id node_id) -> std::optional { + auto it = node_status_map.find(node_id); + if (it == node_status_map.end()) { + return std::nullopt; + } + return it->second; + }; + } + + static std::vector members_from_map( + std::unordered_map + last_seen_map) { + auto members = std::move(last_seen_map) + | std::ranges::views::transform( + [](auto node_and_last_seen) { + return node_and_last_seen.first; + }) + | std::ranges::to>(); + return members; + } + + static std::pair< + std::vector, + health_monitor_backend::get_node_status_t> + members_and_getter_from_map( + std::unordered_map + last_seen_map) { + return { + members_from_map(last_seen_map), + getter_from_map(std::move(last_seen_map))}; + } + + // just checks that a healthy cluster spits out no decom reports + static void smoke_test() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, now}}); + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_FALSE(maybe_decom_status.has_value()); + } + + // check that we get a decom report for an expired node + static void test_timed_out() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, timed_out_timepoint}}); + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout.size(), 1); + ASSERT_TRUE( + maybe_decom_status->nodes_past_auto_decom_timeout[0] + == model::node_id{2}); + } + + // check that we refuse to decom a node if never seen when the default is + // above timeout timepoint + static void test_missing_status_no_timeout() { + std::unordered_map + node_to_last_seen = { + {model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, now}}; + + // members will have all the nodes + auto members = members_from_map(node_to_last_seen); + + // we'll be missing node status from 2 + node_to_last_seen.erase(model::node_id{2}); + auto getter = getter_from_map(node_to_last_seen); + + // construct params with a default time that is not timed out + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.default_last_seen = not_timed_out_timepoint; + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_FALSE(maybe_decom_status.has_value()); + } + + // check that we do decom a missing node if the default time provided is far + // enough back to invoke timeout + static void test_missing_status_timeout() { + std::unordered_map + node_to_last_seen = { + {model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, now}}; + + // members will have all the nodes + auto members = members_from_map(node_to_last_seen); + + // we'll be missing node status from 2 + node_to_last_seen.erase(model::node_id{2}); + auto getter = getter_from_map(node_to_last_seen); + + // construct params with a default time that is not timed out + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.default_last_seen = boot_time_zero; + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout.size(), 1); + ASSERT_TRUE( + maybe_decom_status->nodes_past_auto_decom_timeout[0] + == model::node_id{2}); + } + + // check that we're pushing the right config through + static void test_config() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, timed_out_timepoint}}); + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.current_config_version = config_version{0}; + { + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status( + params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ( + maybe_decom_status->configuration_version, config_version{0}); + } + + { + params.current_config_version = config_version{100}; + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status( + params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ( + maybe_decom_status->configuration_version, config_version{100}); + } + } + + // don't generate reports on non-cluster members + static void test_ignore_nonmembers() { + std::unordered_map + node_to_last_seen = { + {model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, timed_out_timepoint}}; + + // getter will have all + auto getter = getter_from_map(node_to_last_seen); + // 2 will not be a member + node_to_last_seen.erase(model::node_id{2}); + auto members = members_from_map(node_to_last_seen); + + // construct params with a default time that is not timed out + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.default_last_seen = boot_time_zero; + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_FALSE(maybe_decom_status.has_value()); + } + + // time out multiple nodes + static void test_multi_time_out() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, timed_out_timepoint}, + {model::node_id{1}, timed_out_timepoint}, + {model::node_id{2}, timed_out_timepoint}}); + auto params = get_default_params(); + params.nodes = members; + params.node_status_getter = std::move(getter); + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout.size(), 3); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout, members); + } +}; + +TEST(AutoDecomTestSuite, Smoke) { health_report_accessor::smoke_test(); }; +TEST(AutoDecomTestSuite, OneTimedOut) { + health_report_accessor::test_timed_out(); +}; +TEST(AutoDecomTestSuite, MissingStatusNoTimeout) { + health_report_accessor::test_missing_status_no_timeout(); +}; +TEST(AutoDecomTestSuite, MissingStatusTimeout) { + health_report_accessor::test_missing_status_timeout(); +}; +TEST(AutoDecomTestSuite, Config) { health_report_accessor::test_config(); }; +TEST(AutoDecomTestSuite, NoNonMembers) { + health_report_accessor::test_ignore_nonmembers(); +}; +TEST(AutoDecomTestSuite, MultiTimeout) { + health_report_accessor::test_multi_time_out(); +}; +} // namespace cluster diff --git a/src/v/cluster/tests/health_monitor_bench.cc b/src/v/cluster/tests/health_monitor_bench.cc index 4143e1f462226..0bc1c0b334ebf 100644 --- a/src/v/cluster/tests/health_monitor_bench.cc +++ b/src/v/cluster/tests/health_monitor_bench.cc @@ -63,7 +63,12 @@ make_node_health_report(size_t num_topics, size_t partitions_per_topic) { topics.push_back(make_topic_status(i, partitions_per_topic)); } - return {id, local_state, std::move(topics), std::nullopt}; + return { + id, + local_state, + std::move(topics), + /* drain status */ std::nullopt, + /* auto_decommission_status */ std::nullopt}; } template diff --git a/src/v/cluster/tests/health_monitor_test.cc b/src/v/cluster/tests/health_monitor_test.cc index e91bdb0c92bcf..22392b1e6d928 100644 --- a/src/v/cluster/tests/health_monitor_test.cc +++ b/src/v/cluster/tests/health_monitor_test.cc @@ -374,7 +374,8 @@ make_nhr(int nid, const std::vector& statuses) { node_id(nid), node::local_state{}, chunked_vector(statuses.begin(), statuses.end()), - std::nullopt}; + /* drain_status */ std::nullopt, + /* maybe_auto_decommission_status */ std::nullopt}; }; struct node_and_status { @@ -441,7 +442,8 @@ FIXTURE_TEST(test_aggregate, health_report_unit) { model::node_id(0), node::local_state{}, chunked_vector{}, - std::nullopt)}}; + /* drain_status */ std::nullopt, + /* maybe_auto_decommission_status */ std::nullopt)}}; { // empty input, empty report diff --git a/src/v/cluster/tests/partition_balancer_autodecom_test.cc b/src/v/cluster/tests/partition_balancer_autodecom_test.cc new file mode 100644 index 0000000000000..6deaddb467659 --- /dev/null +++ b/src/v/cluster/tests/partition_balancer_autodecom_test.cc @@ -0,0 +1,532 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/health_monitor_types.h" +#include "cluster/partition_balancer_planner.h" + +#include + +#include + +namespace cluster { + +// testing a private static function in partition_balancer_planner, we're using +// a friend class accessor to reach in +class partition_balancer_planner_accessor { + using params = cluster::partition_balancer_planner:: + do_get_auto_decommission_actions_params; + + // convenience representation of node state + // up_down: represents whether a has been down past the auto decom timeout + // node_state: relevant special node statuses that should prevent a decom + // node_and_status: packaged node state so a test scenario can be a list of + // node_id and node state + enum class up_down : uint8_t { up, down }; + enum class node_state : uint8_t { normal, maintenance, decommissioning }; + struct node_and_status { + up_down up_or_down; + node_state node_state; + model::node_id node_id; + config_version config_version; + }; + + // package a set of a params for determinining auto decommission along side + // a holder which will keep referenced memory alive + struct params_and_holders { + params params; + std::vector>> + report_memory_holder; + }; + + // translate the convenience representation to a parameters pack, and a list + // of memory holders to keep the packaged references alive + static params_and_holders + make_params(const std::vector& nodes) { + absl::flat_hash_set all_nodes{}; + absl::flat_hash_set downed_nodes{}; + absl::flat_hash_set maintenance_nodes{}; + absl::flat_hash_set decommissioning_nodes{}; + + for (const auto& node_status : nodes) { + if (node_status.up_or_down == up_down::down) { + downed_nodes.insert(node_status.node_id); + } + all_nodes.insert(node_status.node_id); + + if (node_status.node_state == node_state::maintenance) { + maintenance_nodes.insert(node_status.node_id); + } + if (node_status.node_state == node_state::decommissioning) { + decommissioning_nodes.insert(node_status.node_id); + } + // nothing for normal + } + + // keeps the memory of a given auto decom report alive + std::vector>> + memory_holder{}; + + // index map upon the living reports + partition_balancer_planner::auto_decom_ref_map decom_report_map{}; + + for (const auto& node_status : nodes) { + if (downed_nodes.contains(node_status.node_id)) { + // skip downed nodes for report generation + continue; + } + + // for living nodes, gather the bearing memory and reference list on + // their auto decom reports + auto report_holder + = std::make_unique>( + std::nullopt); + // generate an actual decom status if there are dead nodes + if (!downed_nodes.empty()) { + auto_decommission_status_data adsd{ + .configuration_version = node_status.config_version, + .nodes_past_auto_decom_timeout = std::vector( + downed_nodes.begin(), downed_nodes.end())}; + report_holder + = std::make_unique>( + std::optional{std::move(adsd)}); + } + decom_report_map.emplace( + node_status.node_id, + std::reference_wrapper(std::as_const(*report_holder))); + memory_holder.emplace_back(std::move(report_holder)); + } + + // clang keeps putting the entire list of initializers on one line + // clang-format off + return params_and_holders{ + .params + = params{ + .node_to_decom = std::move(decom_report_map), + .cluster_members = std::move(all_nodes), + .decommissioning_nodes = std::move(decommissioning_nodes), + .maintenance_mode_nodes = std::move(maintenance_nodes), + .current_config = config_version{}}, + .report_memory_holder = std::move(memory_holder)}; + // clang-format on + } + +public: + static void smoke_test() { + params smoke_params{ + .node_to_decom = {}, + .cluster_members + = {model::node_id{0}, model::node_id{2}, model::node_id{1}}, + .decommissioning_nodes = {}, + .maintenance_mode_nodes = {}, + .current_config = config_version{0}}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + smoke_params); + ASSERT_TRUE(result.empty()); + } + + static void test_normal_removal() { + // check that we can decommission a node when a majority votes that its + // past timeout + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::normal, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 1); + + ASSERT_EQ(result.begin().operator*(), model::node_id{2}); + } + + static void test_votes_no_quorum() { + // check that we don't decommission if we dont have a quorum of votes + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::normal, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + // pluck node 3's report from the reports map + params.node_to_decom.erase(model::node_id{3}); + // now there are two reports that 2 is dead, from 0 and 1 + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_ignore_incorrect_config() { + // check that we ignore votes from old configurations + config_version current_config{1}; + config_version old_config{0}; + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = current_config}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = current_config}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = old_config}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::normal, + .node_id = model::node_id{2}, + .config_version = current_config}}); + + params.current_config = current_config; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_dont_decom_twice() { + // make sure a decommissioning node doesn't get another decommissioning + // command + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::decommissioning, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_dont_decom_maintenance() { + // if a node is in maintenance mode, dont decommission it + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::maintenance, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_ignore_non_members() { + // make sure that we ignore reports from nodes that are no longer + // cluster members + config_version current_config{1}; + config_version old_config{0}; + auto [params, holder] = make_params({ + // node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = current_config}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = current_config}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = old_config}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::maintenance, + .node_id = model::node_id{2}, + .config_version = current_config}, + + // node will be a non-member + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{4}, + .config_version = current_config}, + + // node 5 will be a non-member + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{5}, + .config_version = current_config}, + + }); + params.current_config = current_config; + + // drop 4 and 5, this should leave not enough votes to remove 2 + params.cluster_members.erase(model::node_id{4}); + params.cluster_members.erase(model::node_id{5}); + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_multi_decom() { + // checks that given a large enough cluster, we can decom multiple nodes + // at the same time + config_version current_config{0}; + std::vector nodes_and_statuses{}; + + for (auto node_number : std::ranges::iota_view(0, 5)) { + auto node_id = model::node_id{node_number}; + nodes_and_statuses.emplace_back( + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = node_id, + .config_version = current_config}); + } + nodes_and_statuses[0].up_or_down = up_down::down; + nodes_and_statuses[1].up_or_down = up_down::down; + auto [params, holder] = make_params(nodes_and_statuses); + params.current_config = current_config; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 2); + ASSERT_TRUE(result.contains(model::node_id{0})); + ASSERT_TRUE(result.contains(model::node_id{1})); + } + + static void test_quorum_limits() { + // we should only decom a node if a quorum agrees that the node is past + // timeout + // this test checks the limits for even & odd cluster numbers + config_version current_config{0}; + std::vector nodes_and_statuses{}; + int dead_node_number{0}; + model::node_id dead_node{dead_node_number}; + + { + // case: 5 + for (auto node_number : std::ranges::iota_view(0, 5)) { + auto node_id = model::node_id{node_number}; + nodes_and_statuses.emplace_back( + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = node_id, + .config_version = current_config}); + } + nodes_and_statuses[dead_node_number].up_or_down = up_down::down; + auto [params, holder] = make_params(nodes_and_statuses); + params.current_config = current_config; + { // all agree + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 3 agree 1 does not + params.node_to_decom.erase(model::node_id{1}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 2 agree 2 do not + params.node_to_decom.erase(model::node_id{2}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 0); + } + } + { + // case: 6 + for (auto node_number : std::ranges::iota_view(0, 6)) { + auto node_id = model::node_id{node_number}; + nodes_and_statuses.emplace_back( + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = node_id, + .config_version = current_config}); + } + nodes_and_statuses[dead_node_number].up_or_down = up_down::down; + auto [params, holder] = make_params(nodes_and_statuses); + params.current_config = current_config; + { // all agree + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 4 agree 1 does not + params.node_to_decom.erase(model::node_id{1}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 3 agree 2 do not + params.node_to_decom.erase(model::node_id{2}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 0); + } + } + } +}; + +TEST(AutoDecomTestSuite, Smoke) { + partition_balancer_planner_accessor::smoke_test(); +}; +TEST(AutoDecomTestSuite, NormalRemove) { + partition_balancer_planner_accessor::test_normal_removal(); +}; +TEST(AutoDecomTestSuite, NoQuorum) { + partition_balancer_planner_accessor::test_votes_no_quorum(); +} +TEST(AutoDecomTestSuite, OldConfig) { + partition_balancer_planner_accessor::test_ignore_incorrect_config(); +} +TEST(AutoDecomTestSuite, DontDecomTwice) { + partition_balancer_planner_accessor::test_dont_decom_twice(); +} +TEST(AutoDecomTestSuite, DontDecomMaintenance) { + partition_balancer_planner_accessor::test_dont_decom_maintenance(); +} +TEST(AutoDecomTestSuite, IgnoreNonMembers) { + partition_balancer_planner_accessor::test_ignore_non_members(); +} +TEST(AutoDecomTestSuite, DecomMultipleNodes) { + partition_balancer_planner_accessor::test_multi_decom(); +} +TEST(AutoDecomTestSuite, QuorumLimits) { + partition_balancer_planner_accessor::test_quorum_limits(); +} +} // namespace cluster diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index bb746a911c135..6daace22e844d 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -13,6 +13,7 @@ #include "base/units.h" #include "cluster/commands.h" +#include "cluster/config/test/test_config.h" #include "cluster/data_migrated_resources.h" #include "cluster/health_monitor_types.h" #include "cluster/members_table.h" @@ -109,7 +110,8 @@ struct controller_workers { std::ref(table), std::ref(members), std::ref(allocator), - std::ref(node_status_table)) + std::ref(node_status_table), + ss::sharded_parameter(cluster::test::test_config::make_default)) .get(); } @@ -410,7 +412,8 @@ struct partition_balancer_planner_fixture { model::node_id(i), local_state, std::move(node_topics), - std::nullopt)); + /* drain status */ std::nullopt, + /* maybe_auto_decommission_status */ std::nullopt)); } return health_report; diff --git a/src/v/cluster/tests/partition_balancer_simulator_test.cc b/src/v/cluster/tests/partition_balancer_simulator_test.cc index 6888a885a88e6..983db77c3255b 100644 --- a/src/v/cluster/tests/partition_balancer_simulator_test.cc +++ b/src/v/cluster/tests/partition_balancer_simulator_test.cc @@ -740,7 +740,11 @@ class partition_balancer_sim_fixture { return ss::make_foreign( ss::make_lw_shared( - id, local_state, std::move(topics), std::nullopt)); + id, + local_state, + std::move(topics), + /* drain_status */ std::nullopt, + /*maybe_auto_decommission_status */ std::nullopt)); } }; diff --git a/src/v/cluster/tests/randoms.h b/src/v/cluster/tests/randoms.h index dba895ac84357..4aee366f9de14 100644 --- a/src/v/cluster/tests/randoms.h +++ b/src/v/cluster/tests/randoms.h @@ -95,7 +95,8 @@ inline node_health_report random_node_health_report() { tests::random_named_int(), node::random_local_state(), tests::random_chunked_vector(random_topic_status), - random_ds}; + random_ds, + /* maybe_auto_decommission_status */ std::nullopt}; } inline cluster_health_report random_cluster_health_report() { diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index 785edaa99087f..636affc67ad20 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -767,7 +767,8 @@ cluster::cluster_health_report random_cluster_health_report() { tests::random_named_int(), random_local_state(), std::move(topics), - random_drain_status()); + random_drain_status(), + /*maybe_auto_decommission_status*/ std::nullopt); // Reduce to an ADL-encodable state report.local_state.cache_disk = std::nullopt; @@ -1659,7 +1660,8 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { tests::random_named_int(), random_local_state(), std::move(topics), - random_drain_status()); + random_drain_status(), + /*maybe_auto_decommission_status*/ std::nullopt); // Squash local_state to a form that ADL represents, since we will // test ADL roundtrip. @@ -1676,7 +1678,8 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { tests::random_named_int(), random_local_state(), std::move(topics), - random_drain_status()); + random_drain_status(), + /*maybe_auto_decommission_status*/ std::nullopt); // Squash to ADL-understood disk state report.local_state.cache_disk = report.local_state.data_disk; diff --git a/src/v/cluster/tests/topic_table_fixture.h b/src/v/cluster/tests/topic_table_fixture.h index a61a1cb2ec515..a4770b0016eb2 100644 --- a/src/v/cluster/tests/topic_table_fixture.h +++ b/src/v/cluster/tests/topic_table_fixture.h @@ -12,6 +12,7 @@ #pragma once #include "base/units.h" +#include "cluster/config/test/test_config.h" #include "cluster/data_migrated_resources.h" #include "cluster/members_table.h" #include "cluster/node_status_table.h" @@ -76,7 +77,8 @@ struct topic_table_fixture { std::ref(table), std::ref(members), std::ref(allocator), - std::ref(node_status)) + std::ref(node_status), + ss::sharded_parameter(cluster::test::test_config::make_default)) .get(); } diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 9f21a0537f2a6..bc3a466d30e07 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3274,6 +3274,15 @@ configuration::configuration() "`continuous`. ", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 15min) + , partition_autobalancing_node_autodecommission_time( + *this, + "partition_autobalancing_node_autodecommission_time", + "When a node is unavailable for at least this timeout duration, it " + "triggers Redpanda to decommission the node. This property " + "applies only when `partition_autobalancing_mode` is set to " + "`continuous`. ", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + 24h) , partition_autobalancing_max_disk_usage_percent( *this, "partition_autobalancing_max_disk_usage_percent", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 759d65f728539..d49ef0f83e084 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -604,6 +604,10 @@ struct configuration final : public config_store { partition_autobalancing_mode; property partition_autobalancing_node_availability_timeout_sec; + + property + partition_autobalancing_node_autodecommission_time; + bounded_property partition_autobalancing_max_disk_usage_percent; property partition_autobalancing_tick_interval_ms; diff --git a/tests/rptest/tests/auto_decommission_test.py b/tests/rptest/tests/auto_decommission_test.py new file mode 100644 index 0000000000000..88db9652c67fc --- /dev/null +++ b/tests/rptest/tests/auto_decommission_test.py @@ -0,0 +1,360 @@ +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import random +import time + +from ducktape.cluster.cluster import ClusterNode +from ducktape.utils.util import wait_until +from ducktape.tests.test import TestContext + +from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.kgo_verifier_services import ( + KgoVerifierMultiProducer, + KgoVerifierParams, + KgoVerifierMultiConsumerGroupConsumer, +) +from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST +from rptest.tests.prealloc_nodes import PreallocNodesTest + + +class AutoDecommissionTest(PreallocNodesTest): + """ + Test automatic node decommissioning when a node is unresponsive. + """ + + def __init__(self, test_context: TestContext): + self._topic = None + self._topics = None + + super(AutoDecommissionTest, self).__init__( + test_context=test_context, + num_brokers=5, + node_prealloc_count=2, + ) + + def setup(self): + # defer starting redpanda to test body + pass + + @property + def admin(self): + return self.redpanda._admin + + def _create_topics(self, replication_factors: list[int] = [1, 3]): + """ + :return: total number of partitions in all topics + """ + total_partitions = 0 + topics: list[TopicSpec] = [] + for enumeration in range(10): + partitions = random.randint(1, 10) + spec = TopicSpec( + name=f"topic-{enumeration}", + partition_count=partitions, + replication_factor=random.choice(replication_factors), + ) + topics.append(spec) + total_partitions += partitions + + rpk = RpkTool(self.redpanda) + for spec in topics: + rpk.create_topic( + topic=spec.name, + partitions=spec.partition_count, + replicas=spec.replication_factor, + ) + + # self._topic = random.choice(topics).name + self._topics = topics + + return total_partitions + + def _not_decommissioned_node(self, decommed_node_id: int) -> ClusterNode: + return [ + n + for n in self.redpanda.started_nodes() + if self.redpanda.node_id(n) != decommed_node_id + ][0] + + @property + def msg_size(self) -> int: + return 64 + + @property + def msg_count(self) -> int: + # test should run for ~90s, so throughput over msg size * expected runtime should yield runtime + return int(90 * self.producer_throughput / self.msg_size) + + @property + def producer_throughput(self) -> int: + # this is the total throughput for the entire producer + return 1024 + + def _get_messages_per_topic(self) -> int: + # total messages over number of topics + assert self._topics is not None, ( + "_topics list must be initialized by the time _get_messages_per_topic is called" + ) + return int(self.msg_count / len(self._topics)) + + def _get_throughput_per_topic(self) -> int: + # total throughput over number of topics + assert self._topics is not None, ( + "_topics list must be initialized by the time _get_throughput_per_topic is called" + ) + return int(self.producer_throughput / len(self._topics)) + + def _start_producer(self) -> None: + self.redpanda.logger.info( + f"starting kgo-verifier producer with expected runtime of {self.msg_count / self.producer_throughput}" + ) + assert self._topics is not None, "topics must be defined to start producer" + params = [ + KgoVerifierParams( + topic=topic, + msg_size=self.msg_size, + msg_count=self._get_messages_per_topic(), + rate_limit_bps=self._get_throughput_per_topic(), + node=self.preallocated_nodes[0], + ) + for topic in self._topics + ] + self.producer = KgoVerifierMultiProducer( + context=self.test_context, + redpanda=self.redpanda, + topics=params, + custom_node=self.preallocated_nodes[0:1], + ) + + self.producer.start() + + self.producer.wait_for_acks([10 for _ in self._topics], 15, 1) + + def _start_consumer(self) -> None: + assert self._topics is not None, "topics must be defined to start consumer" + params = [ + KgoVerifierParams( + topic=topic, + msg_size=self.msg_size, + msg_count=self._get_messages_per_topic(), + node=self.preallocated_nodes[1], + ) + for topic in self._topics + ] + self.consumer = KgoVerifierMultiConsumerGroupConsumer( + self.test_context, + self.redpanda, + topics=params, + custom_node=self.preallocated_nodes[1:], + ) + + self.consumer.start() + + def verify(self): + self.redpanda.logger.info( + f"verifying workload: topic: {self._topic}, " + + f"with [rate_limit: {self.producer_throughput}, message size: {self.msg_size}," + + f"message count: {self.msg_count}]" + ) + # let the producer and consumer finish + self.producer.wait() + self.consumer.wait() + + def start_redpanda(self, new_bootstrap: bool = True): + if new_bootstrap: + self.redpanda.set_seed_servers(self.redpanda.nodes) + + self.redpanda.start( + auto_assign_node_id=new_bootstrap, omit_seeds_on_idx_one=not new_bootstrap + ) + + @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_automatic_node_decommissioning(self): + """ + Test that a node is automatically decommissioned when it's unresponsive + for the configured timeout period. + """ + # tick < unavailable < autodecommission + partition_balancer_tick_interval_ms = 5000 + partition_balancer_unavailable_timeout_s = 15 + autodecommission_timeout_s = 30 + + # Configure partition autobalancing for auto-decommission + self.redpanda.add_extra_rp_conf( + { + "partition_autobalancing_mode": "continuous", + "partition_autobalancing_node_availability_timeout_sec": partition_balancer_unavailable_timeout_s, + "partition_autobalancing_node_autodecommission_time": autodecommission_timeout_s, + "partition_autobalancing_tick_interval_ms": partition_balancer_tick_interval_ms, + } + ) + + self.start_redpanda(new_bootstrap=True) + self._create_topics(replication_factors=[3]) + + self._start_producer() + self._start_consumer() + + # Select a random node to make unresponsive + to_decommission = random.choice(self.redpanda.nodes) + node_id = self.redpanda.node_id(to_decommission) + + self.redpanda.logger.info( + f"Stopping node {node_id} to trigger automatic decommissioning" + ) + + # Stop the node so it registers in the decom logic + self.redpanda.stop_node(node=to_decommission) + + # Wait for the timeout period plus buffer for processing + # Total wait: timeout + extra time for detection and decommission start + wait_time_sec = autodecommission_timeout_s * 4 + self.redpanda.logger.info( + f"Waiting {wait_time_sec} seconds for automatic decommissioning to trigger" + ) + + # Just make sure we're not pinging the dead node + survivor_node = self._not_decommissioned_node(node_id) + + # Verify that the node status changes to 'draining' (decommissioning) + def node_is_removed(): + try: + brokers = self.admin.get_brokers(node=survivor_node) + for b in brokers: + if b["node_id"] == node_id: + return False + return True + except Exception as e: + self.redpanda.logger.info(f"Error checking broker status: {e}") + return False + + # Decommission will remove the node, wait for it + wait_until( + node_is_removed, + timeout_sec=wait_time_sec, + backoff_sec=5, + err_msg=f"Node {node_id} was not automatically decommissioned after {wait_time_sec} seconds", + ) + + self.redpanda.logger.info(f"Node {node_id} was successfully removed") + + # Finish the producers and consumers + self.verify() + + @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_decom_timer_reset(self): + """ + Auto decommission safety requires that the timer for auto decommission resets when a node restarts + (if this weren't the case, a restart of a quorum of nodes could easily cause an early node ejection). + This test is meant to check that auto ejection DOES get delayed by a sufficient number of node restarts. + """ + # tick < unavailable < autodecommission + partition_balancer_tick_interval_ms = 5000 + partition_balancer_unavailable_timeout_s = 15 + autodecommission_timeout_s = 60 + + # Configure partition autobalancing for auto-decommission + self.redpanda.add_extra_rp_conf( + { + "partition_autobalancing_mode": "continuous", + "partition_autobalancing_node_availability_timeout_sec": partition_balancer_unavailable_timeout_s, + "partition_autobalancing_node_autodecommission_time": autodecommission_timeout_s, + "partition_autobalancing_tick_interval_ms": partition_balancer_tick_interval_ms, + } + ) + + self.start_redpanda(new_bootstrap=True) + self._create_topics(replication_factors=[3]) + + self._start_producer() + self._start_consumer() + + # Select a random node to make unresponsive + to_decommission = random.choice(self.redpanda.nodes) + to_decommission_node_id = self.redpanda.node_id(to_decommission) + + # just make sure we're not pinging the dead node + survivor_node = self._not_decommissioned_node(to_decommission_node_id) + + self.redpanda.logger.info( + f"Stopping node {to_decommission_node_id} to trigger automatic decommissioning" + ) + + # Stop the node so it decoms + self.redpanda.stop_node(node=to_decommission) + + # Sleep for half of the auto decom timeout + time.sleep(autodecommission_timeout_s / 2) + + # Restart the remaining nodes + self.redpanda.restart_nodes( + nodes=[ + node + for node in self.redpanda.nodes + if self.redpanda.node_id(node) + is not self.redpanda.node_id(to_decommission) + ], + auto_assign_node_id=True, # on restart, let the nodes resume with their prior id + ) + + # Wait for the controller to returns + controller_leader_id = self.admin.await_stable_leader( + "controller", namespace="redpanda" + ) + self.redpanda.logger.debug(f"controller leader id: {controller_leader_id}") + + # The leader balancer starts after 30s, sleep for 45 seconds at which point + # a balancer action should have happened and if the timer was genuinely reset + # , the dead node should not be eligable for decommissioning yet + time.sleep(autodecommission_timeout_s * 0.75) + + # Check that the dead broker has not been decommissioned + broker_statuses = self.admin.get_brokers(node=survivor_node) + was_decommissioned: bool = to_decommission_node_id not in [ + int(broker_status["node_id"]) for broker_status in broker_statuses + ] + assert was_decommissioned == False, ( + "a restart of the brokers should reset the autodecommission timeout" + ) + + # Wait for the remainder of the timeout to make sure it does eventually decom + wait_time_sec = autodecommission_timeout_s * 2 + self.redpanda.logger.info( + f"Waiting {wait_time_sec} seconds for automatic decommissioning to trigger" + ) + + # Verify that the node status changes to 'draining' (decommissioning) + def node_is_removed(): + try: + brokers = self.admin.get_brokers(node=survivor_node) + for b in brokers: + if b["node_id"] == to_decommission_node_id: + return False + return True + except Exception as e: + self.redpanda.logger.info(f"Error checking broker status: {e}") + return False + + # Wait for the node to be removed from the + wait_until( + node_is_removed, + timeout_sec=wait_time_sec, + backoff_sec=5, + err_msg=f"Node {to_decommission_node_id} was not automatically decommissioned after {wait_time_sec} seconds", + ) + + self.redpanda.logger.info( + f"Node {to_decommission_node_id} was automatically marked for decommissioning" + ) + + # Finish the producers and consumers + self.verify()