From 617f61142b6469a7b1326bbaccb17d455ef81e76 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 15 Dec 2025 10:47:40 -0800 Subject: [PATCH 01/11] cluster/config: add an interface for config Upcoming changes will require accessing the cluster's config_version for correctness checks, but config_manager is a heavy dependency. This commit adds 1. an interface for cluster configs - (currently just config_version fetching) 2. an implementation for cluster_config backed by config_backend 3. a test impl which allows the user to bind a config_i to a lambda --- src/v/cluster/BUILD | 3 +++ src/v/cluster/config/cluster_config.cc | 20 ++++++++++++++++ src/v/cluster/config/cluster_config.h | 19 +++++++++++++++ src/v/cluster/config/config_i.h | 18 ++++++++++++++ src/v/cluster/config/test/BUILD | 12 ++++++++++ src/v/cluster/config/test/test_config.h | 31 +++++++++++++++++++++++++ 6 files changed, 103 insertions(+) create mode 100644 src/v/cluster/config/cluster_config.cc create mode 100644 src/v/cluster/config/cluster_config.h create mode 100644 src/v/cluster/config/config_i.h create mode 100644 src/v/cluster/config/test/BUILD create mode 100644 src/v/cluster/config/test/test_config.h diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index e85a48e8859a5..4d99ea5a54d63 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -318,6 +318,7 @@ redpanda_cc_library( "cluster_recovery_reconciler.cc", "cluster_recovery_table.cc", "cluster_utils.cc", + "config/cluster_config.cc", "config_frontend.cc", "config_manager.cc", "controller.cc", @@ -486,6 +487,8 @@ redpanda_cc_library( "cluster_utils.h", "cluster_uuid.h", "commands.h", + "config/cluster_config.h", + "config/config_i.h", "config_frontend.h", "config_manager.h", "controller.h", diff --git a/src/v/cluster/config/cluster_config.cc b/src/v/cluster/config/cluster_config.cc new file mode 100644 index 0000000000000..ddd3030eef677 --- /dev/null +++ b/src/v/cluster/config/cluster_config.cc @@ -0,0 +1,20 @@ + +#include "cluster/config/cluster_config.h" + +#include "cluster/config_manager.h" +#include "cluster/types.h" + +namespace cluster { +cluster_config::cluster_config( + ss::sharded& config_manager) noexcept + : _config_manager(config_manager) {} + +std::expected +cluster_config::get_config() const noexcept { + vassert( + ss::this_shard_id() == cluster::config_manager::shard, + "This operation can only be invoked on the config_manager shard"); + return _config_manager.local().get_version(); +} + +} // namespace cluster diff --git a/src/v/cluster/config/cluster_config.h b/src/v/cluster/config/cluster_config.h new file mode 100644 index 0000000000000..1ca1a1d1854f1 --- /dev/null +++ b/src/v/cluster/config/cluster_config.h @@ -0,0 +1,19 @@ + +#pragma once + +#include "cluster/config/config_i.h" +#include "cluster/config_manager.h" + +namespace cluster { + +class cluster_config : public config_i { +public: + explicit cluster_config( + ss::sharded& config_manager) noexcept; + std::expected + get_config() const noexcept override; + +private: + ss::sharded& _config_manager; +}; +} // namespace cluster diff --git a/src/v/cluster/config/config_i.h b/src/v/cluster/config/config_i.h new file mode 100644 index 0000000000000..aa04d91c6dd84 --- /dev/null +++ b/src/v/cluster/config/config_i.h @@ -0,0 +1,18 @@ + +#pragma once + +#include "cluster/types.h" + +#include + +namespace cluster { + +// interface for cluster configuration +class config_i { +public: + virtual std::expected + get_config() const noexcept = 0; + + virtual ~config_i() = default; +}; +} // namespace cluster diff --git a/src/v/cluster/config/test/BUILD b/src/v/cluster/config/test/BUILD new file mode 100644 index 0000000000000..8ff5fa2bcc955 --- /dev/null +++ b/src/v/cluster/config/test/BUILD @@ -0,0 +1,12 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +redpanda_cc_library( + name = "test_config", + hdrs = [ + "test_config.h", + ], + visibility = ["//visibility:public"], + deps = [ + "//src/v/cluster", + ], +) diff --git a/src/v/cluster/config/test/test_config.h b/src/v/cluster/config/test/test_config.h new file mode 100644 index 0000000000000..1f8df92fc55a1 --- /dev/null +++ b/src/v/cluster/config/test/test_config.h @@ -0,0 +1,31 @@ + +#pragma once + +#include "cluster/config/config_i.h" +#include "cluster/types.h" + +namespace cluster::test { +class test_config : public config_i { +public: + using functor_t + = std::function()>; + + explicit test_config(functor_t function) noexcept + : _function(std::move(function)) {} + + std::expected + get_config() const noexcept override { + return _function(); + } + + static inline std::unique_ptr make_default() { + return std::make_unique([] { + return std::expected{ + config_version{0}}; + }); + } + +private: + functor_t _function; +}; +} // namespace cluster::test From e09e46f540b18ad7a4d146aadf03d5511eb4ea37 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Wed, 26 Nov 2025 11:00:11 -0800 Subject: [PATCH 02/11] config: add auto decommission configuration Adds partition_autobalancing_node_autodecommission_time which is the time in seconds after which partition balancer planner should begin decommissioning a node which is unresponsive. --- src/v/config/configuration.cc | 9 +++++++++ src/v/config/configuration.h | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 9f21a0537f2a6..bc3a466d30e07 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3274,6 +3274,15 @@ configuration::configuration() "`continuous`. ", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 15min) + , partition_autobalancing_node_autodecommission_time( + *this, + "partition_autobalancing_node_autodecommission_time", + "When a node is unavailable for at least this timeout duration, it " + "triggers Redpanda to decommission the node. This property " + "applies only when `partition_autobalancing_mode` is set to " + "`continuous`. ", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + 24h) , partition_autobalancing_max_disk_usage_percent( *this, "partition_autobalancing_max_disk_usage_percent", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 759d65f728539..d49ef0f83e084 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -604,6 +604,10 @@ struct configuration final : public config_store { partition_autobalancing_mode; property partition_autobalancing_node_availability_timeout_sec; + + property + partition_autobalancing_node_autodecommission_time; + bounded_property partition_autobalancing_max_disk_usage_percent; property partition_autobalancing_tick_interval_ms; From 5ecf6e1e5eb8d8ddc94e2b4bc6bdc1d93436107c Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Wed, 26 Nov 2025 11:01:23 -0800 Subject: [PATCH 03/11] partition_balancer_*: implement auto decommission Wires partition_autobalancing_node_autodecommission_time into partition balancer. This commit adds the basicmost implementation of auto decommissioning which is based on the last seen from the perspective of the current controller broker. This implementation will run into problems when controller leadership changes. In future commits, this will be changed for a coordinated approach where the partition_balancer_planner will instead use the cluster health report to seek the consent of a quorum of nodes before decommissioning a broker. --- src/v/cluster/controller.cc | 2 ++ src/v/cluster/partition_balancer_backend.cc | 24 ++++++++++++++++++++- src/v/cluster/partition_balancer_backend.h | 2 ++ src/v/cluster/partition_balancer_planner.cc | 19 +++++++++++++++- src/v/cluster/partition_balancer_planner.h | 6 ++++++ 5 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index e2ee1685514ce..87c777bbcd37d 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -798,6 +798,8 @@ ss::future<> controller::start( std::ref(_members_frontend), config::shard_local_cfg() .partition_autobalancing_node_availability_timeout_sec.bind(), + config::shard_local_cfg() + .partition_autobalancing_node_autodecommission_time.bind(), config::shard_local_cfg() .partition_autobalancing_max_disk_usage_percent.bind(), config::shard_local_cfg().partition_autobalancing_tick_interval_ms.bind(), diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 9c8c9e1d24b2e..aa60b0004ae6d 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -54,6 +54,7 @@ partition_balancer_backend::partition_balancer_backend( ss::sharded& topics_frontend, ss::sharded& members_frontend, config::binding&& availability_timeout, + config::binding&& decommission_timeout, config::binding max_disk_usage_percent, config::binding&& tick_interval, config::binding&& max_concurrent_actions, @@ -76,6 +77,7 @@ partition_balancer_backend::partition_balancer_backend( features::license_required_feature:: partition_auto_balancing_continuous>()) , _availability_timeout(std::move(availability_timeout)) + , _decommission_timeout(std::move(decommission_timeout)) , _max_disk_usage_percent(std::move(max_disk_usage_percent)) , _tick_interval(std::move(tick_interval)) , _max_concurrent_actions(std::move(max_concurrent_actions)) @@ -448,7 +450,8 @@ ss::future<> partition_balancer_backend::do_tick() { "violations: unavailable nodes: {}, full nodes: {}; " "nodes to rebalance count: {}; on demand rebalance requested: {}; " "updates in progress: {}; " - "action counts: reassignments: {}, cancellations: {}, failed: {}; " + "action counts: reassignments: {}, cancellations: {}, nodes to " + "decommission: {}, failed: {}; " "counts rebalancing finished: {}, force refresh health report: {}", _cur_term->last_status, _cur_term->last_violations.unavailable_nodes.size(), @@ -458,6 +461,7 @@ ss::future<> partition_balancer_backend::do_tick() { _state.topics().updates_in_progress().size(), plan_data.reassignments.size(), plan_data.cancellations.size(), + plan_data.decommissions.size(), plan_data.failed_actions_count, plan_data.counts_rebalancing_finished, _cur_term->_force_health_report_refresh); @@ -549,6 +553,24 @@ ss::future<> partition_balancer_backend::do_tick() { _cur_term->last_tick_in_progress_updates = moves_before + plan_data.cancellations.size() + plan_data.reassignments.size(); + + for (auto node_to_decommission : plan_data.decommissions) { + vlog( + clusterlog.info, + "submitting decommission on unresponsive node: {}", + node_to_decommission); + + auto decom_error = co_await _members_frontend.decommission_node( + node_to_decommission); + + if (decom_error) { + vlog( + clusterlog.warn, + "node: {}, failed to decommission with error: {}", + node_to_decommission, + decom_error); + } + } } partition_balancer_overview_reply partition_balancer_backend::overview() const { diff --git a/src/v/cluster/partition_balancer_backend.h b/src/v/cluster/partition_balancer_backend.h index 59d82c4618765..0279492de9ef5 100644 --- a/src/v/cluster/partition_balancer_backend.h +++ b/src/v/cluster/partition_balancer_backend.h @@ -43,6 +43,7 @@ class partition_balancer_backend { ss::sharded&, ss::sharded&, config::binding&& availability_timeout, + config::binding&& decommission_timeout, config::binding max_disk_usage_percent, config::binding&& tick_interval, config::binding&& max_concurrent_actions, @@ -102,6 +103,7 @@ class partition_balancer_backend { config::enum_property> _mode; config::binding _availability_timeout; + config::binding _decommission_timeout; config::binding _max_disk_usage_percent; config::binding _tick_interval; config::binding _max_concurrent_actions; diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index e0e508d9c3ddd..a89a0b2a88823 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -260,6 +260,9 @@ class partition_balancer_planner::request_context { _topic2node_counts; absl::node_hash_map _reassignments; absl::node_hash_map _force_reassignments; + // nodes to request to decommission, not nodes which are currently + // decommissioning + absl::flat_hash_set _nodes_to_be_decommed; size_t _failed_actions_count = 0; // we track missing partition size info separately as it requires force // refresh of health report @@ -378,6 +381,13 @@ void partition_balancer_planner::init_per_node_state( - std::chrono::duration_cast< model::timestamp_clock::duration>(time_since_last_seen)); + // get all nodes which are unresponsive enough to decom + if (time_since_last_seen > _config.decommission_timeout) { + if (!ctx.decommissioning_nodes.contains(id)) { + ctx._nodes_to_be_decommed.insert(id); + } + } + result.violations.unavailable_nodes.emplace_back( id, unavailable_since); } @@ -2171,9 +2181,14 @@ void partition_balancer_planner::request_context::collect_actions( result.reallocation_failures = std::move(_reallocation_failures); + result.decommissions.reserve(_nodes_to_be_decommed.size()); + std::ranges::move( + std::move(_nodes_to_be_decommed), + std::back_inserter(result.decommissions)); + if ( !result.cancellations.empty() || !result.reassignments.empty() - || result.counts_rebalancing_finished) { + || result.counts_rebalancing_finished || !result.decommissions.empty()) { result.status = status::actions_planned; } } @@ -2214,6 +2229,8 @@ partition_balancer_planner::plan_actions( change_reason::node_unavailable); co_await get_full_node_actions(ctx); co_await get_rack_constraint_repair_actions(ctx); + // get node decommission actions is already part of + // init_per_node_state } co_await get_counts_rebalancing_actions(ctx); co_await get_force_repair_actions(ctx); diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index 1841478937ad2..6aabe35eb627c 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -42,7 +42,12 @@ struct planner_config { double max_disk_usage_ratio; // Max number of actions that can be scheduled in one planning iteration size_t max_concurrent_actions; + // If a node is unresponsive for more than node_availability_timeout_sec, + // begin moving partitions off of that node std::chrono::seconds node_availability_timeout_sec; + // If a node is unresponsive for more than decommission timeout, launch a + // decommission operation against it + std::chrono::seconds decommission_timeout; // If the user manually requested a rebalance (not connected to node // addition) bool ondemand_rebalance_requested = false; @@ -86,6 +91,7 @@ class partition_balancer_planner { chunked_vector cancellations; chunked_hash_map reallocation_failures; + chunked_vector decommissions; bool counts_rebalancing_finished = false; size_t failed_actions_count = 0; status status = status::empty; From ffccb9241d8e0217f1921764e6920ade63b5d05c Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Thu, 11 Dec 2025 15:21:23 -0800 Subject: [PATCH 04/11] health_monitor_backend: wire in components adds config_i and node_status to health monitor backend. These will be used in future commits to create an auto decom status report --- src/v/cluster/controller.cc | 7 ++++++- src/v/cluster/health_monitor_backend.cc | 6 +++++- src/v/cluster/health_monitor_backend.h | 8 +++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 87c777bbcd37d..e421f0b5b3b09 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -27,6 +27,7 @@ #include "cluster/cluster_link/table.h" #include "cluster/cluster_recovery_table.h" #include "cluster/cluster_utils.h" +#include "cluster/config/cluster_config.h" #include "cluster/config_frontend.h" #include "cluster/controller_api.h" #include "cluster/controller_backend.h" @@ -727,7 +728,11 @@ ss::future<> controller::start( std::ref(_drain_manager), std::ref(_feature_table), std::ref(_partition_leaders), - std::ref(_tp_state)); + std::ref(_tp_state), + std::ref(_node_status_table), + ss::sharded_parameter([this]() { + return std::make_unique(_config_manager); + })); _leader_balancer = std::make_unique( _tp_state.local(), diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 20414e105ef22..78421ecc790e9 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -67,7 +67,9 @@ health_monitor_backend::health_monitor_backend( ss::sharded& drain_manager, ss::sharded& feature_table, ss::sharded& partition_leaders_table, - ss::sharded& topic_table) + ss::sharded& topic_table, + ss::sharded& node_status_table, + std::unique_ptr config) : _raft0(std::move(raft0)) , _members(mt) , _connections(connections) @@ -78,6 +80,8 @@ health_monitor_backend::health_monitor_backend( , _feature_table(feature_table) , _partition_leaders_table(partition_leaders_table) , _topic_table(topic_table) + , _node_status_table(node_status_table) + , _config(std::move(config)) , _reports{ss::make_lw_shared()} , _local_monitor(local_monitor) , _self(_raft0->self().id()) {} diff --git a/src/v/cluster/health_monitor_backend.h b/src/v/cluster/health_monitor_backend.h index ddce0682c5853..4fd27ea8e2be5 100644 --- a/src/v/cluster/health_monitor_backend.h +++ b/src/v/cluster/health_monitor_backend.h @@ -11,9 +11,11 @@ */ #pragma once #include "absl/container/node_hash_map.h" +#include "cluster/config/config_i.h" #include "cluster/fwd.h" #include "cluster/health_monitor_types.h" #include "cluster/node/local_monitor.h" +#include "cluster/node_status_table.h" #include "cluster/notification.h" #include "features/feature_table.h" #include "model/metadata.h" @@ -71,7 +73,9 @@ class health_monitor_backend { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&, + std::unique_ptr); ss::future<> stop(); @@ -232,6 +236,8 @@ class health_monitor_backend { ss::sharded& _feature_table; ss::sharded& _partition_leaders_table; ss::sharded& _topic_table; + ss::sharded& _node_status_table; + std::unique_ptr _config; ss::lowres_clock::time_point _last_refresh; ss::lw_shared_ptr _refresh_request; From 3e3361f0f57456616420c7ddeceed0f1bff17d56 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Wed, 10 Dec 2025 15:35:13 -0800 Subject: [PATCH 05/11] health_monitor_types: add auto_decom_status Adds a new struct to health_monitor_types: auto_decommission_status. This struct will report when a node hasn't been heard from long enough to be considered for automatic decommission Adds this struct to node_health_report and node_health_report_serde. Adds build fixes needed given the above. --- src/v/cluster/health_monitor_backend.cc | 13 +++- src/v/cluster/health_monitor_types.cc | 42 +++++++++-- src/v/cluster/health_monitor_types.h | 71 +++++++++++++++++-- src/v/cluster/tests/health_bench.cc | 6 +- src/v/cluster/tests/health_monitor_bench.cc | 7 +- src/v/cluster/tests/health_monitor_test.cc | 6 +- .../partition_balancer_planner_fixture.h | 3 +- .../partition_balancer_simulator_test.cc | 6 +- src/v/cluster/tests/randoms.h | 3 +- src/v/cluster/tests/serialization_rt_test.cc | 9 ++- 10 files changed, 141 insertions(+), 25 deletions(-) diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 78421ecc790e9..47575b9b7fb03 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -20,6 +20,7 @@ #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/node/local_monitor.h" +#include "cluster/node_status_table.h" #include "cluster/partition_manager.h" #include "cluster/partition_probe.h" #include "config/configuration.h" @@ -203,7 +204,11 @@ std::optional health_monitor_backend::build_node_report( } node_health_report ret{ - it->second->id, it->second->local_state, {}, it->second->drain_status}; + it->second->id, + it->second->local_state, + {}, + it->second->drain_status, + /*maybe_auto_decommission_status*/ std::nullopt}; ret.local_state.logical_version = features::feature_table::get_latest_logical_version(); ret.topics = filter_topic_status(it->second->topics, f.ntp_filters); @@ -898,7 +903,11 @@ health_monitor_backend::collect_current_node_health() { it->second.last_reply_timestamp = ss::lowres_clock::now(); co_return node_health_report{ - id, std::move(local_state), std::move(topics), std::move(drain_status)}; + id, + std::move(local_state), + std::move(topics), + std::move(drain_status), + /*maybe_auto_decommission_status*/ std::nullopt}; } ss::future> health_monitor_backend::get_current_node_health() { diff --git a/src/v/cluster/health_monitor_types.cc b/src/v/cluster/health_monitor_types.cc index d67917a52fb37..641a777d5ceff 100644 --- a/src/v/cluster/health_monitor_types.cc +++ b/src/v/cluster/health_monitor_types.cc @@ -75,14 +75,32 @@ std::ostream& operator<<(std::ostream& o, const node_state& s) { return o; } +std::ostream& operator<<(std::ostream& o, const auto_decommission_status& ads) { + fmt::print( + o, + "{{config_version: {}, nodes_past_auto_decom_timeout: {}}}", + ads.configuration_version, + ads.nodes_past_auto_decom_timeout); + return o; +} + +bool operator==( + const auto_decommission_status& a, const auto_decommission_status& b) { + return a.configuration_version == b.configuration_version + && std::ranges::equal( + a.nodes_past_auto_decom_timeout, b.nodes_past_auto_decom_timeout); +} + node_health_report::node_health_report( model::node_id id, node::local_state local_state, chunked_vector topics_vec, - std::optional drain_status) + std::optional drain_status, + std::optional maybe_auto_decommission_status) : id(id) , local_state(std::move(local_state)) - , drain_status(drain_status) { + , drain_status(drain_status) + , maybe_auto_decommission_status(std::move(maybe_auto_decommission_status)) { topics.reserve(topics_vec.size()); for (auto& topic : topics_vec) { topics.emplace( @@ -91,7 +109,8 @@ node_health_report::node_health_report( } node_health_report node_health_report::copy() const { - node_health_report ret{id, local_state, {}, drain_status}; + node_health_report ret{ + id, local_state, {}, drain_status, maybe_auto_decommission_status}; ret.topics.reserve(topics.bucket_count()); for (const auto& [tp_ns, partitions] : topics) { ret.topics.emplace(tp_ns, copy_partition_statuses(partitions)); @@ -104,7 +123,12 @@ std::ostream& operator<<(std::ostream& o, const node_health_report& r) { } node_health_report_serde::node_health_report_serde(const node_health_report& hr) - : node_health_report_serde(hr.id, hr.local_state, {}, hr.drain_status) { + : node_health_report_serde( + hr.id, + hr.local_state, + /* topics */ {}, + hr.drain_status, + hr.maybe_auto_decommission_status) { topics.reserve(hr.topics.size()); for (const auto& [tp_ns, partitions] : hr.topics) { topics.emplace_back(tp_ns, copy_to_vector(partitions)); @@ -154,11 +178,13 @@ partition_statuses_map_t copy_to_map(const partition_statuses_t& ps_vec) { std::ostream& operator<<(std::ostream& o, const node_health_report_serde& r) { fmt::print( o, - "{{id: {}, topics: {}, local_state: {}, drain_status: {}}}", + "{{id: {}, topics: {}, local_state: {}, drain_status: {}, " + "maybe_auto_decommission_status {}}}", r.id, r.topics, r.local_state, - r.drain_status); + r.drain_status, + r.maybe_auto_decommission_status); return o; } @@ -171,7 +197,9 @@ bool operator==( a.topics.cbegin(), a.topics.cend(), b.topics.cbegin(), - b.topics.cend()); + b.topics.cend()) + && a.maybe_auto_decommission_status + == b.maybe_auto_decommission_status; } std::ostream& operator<<(std::ostream& o, const cluster_health_report& r) { diff --git a/src/v/cluster/health_monitor_types.h b/src/v/cluster/health_monitor_types.h index ddcedda94eccb..faf0d8e557ee8 100644 --- a/src/v/cluster/health_monitor_types.h +++ b/src/v/cluster/health_monitor_types.h @@ -15,6 +15,7 @@ #include "cluster/drain_manager.h" #include "cluster/errc.h" #include "cluster/node/types.h" +#include "cluster/types.h" #include "container/chunked_hash_map.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -196,6 +197,47 @@ struct topic_status auto serde_fields() { return std::tie(tp_ns, partitions); } }; +/** + * Status for the automatic decommissioning of dead nodes + */ + +struct auto_decommission_status_data { + // ensures that the auto decommission configuration is the same on the + // source node and controller + config_version configuration_version{config_version_unset}; + // a list of the node_ids which have exceeded the auto decommission timeout + std::vector nodes_past_auto_decom_timeout; +}; + +struct auto_decommission_status + : serde::envelope< + auto_decommission_status, + serde::version<0>, + serde::compat_version<0>> + , auto_decommission_status_data { + static constexpr int8_t current_version = 0; + + using auto_decommission_status_data::auto_decommission_status_data; + + // NOLINTNEXTLINE hicpp-explicit-conversions + auto_decommission_status(auto_decommission_status_data data) noexcept + : auto_decommission_status_data{std::move(data)} { + static_assert( + sizeof(auto_decommission_status_data) + == sizeof(auto_decommission_status)); + } + + friend std::ostream& + operator<<(std::ostream&, const auto_decommission_status&); + + auto serde_fields() { + return std::tie(configuration_version, nodes_past_auto_decom_timeout); + } + + friend bool operator==( + const auto_decommission_status& a, const auto_decommission_status& b); +}; + /** * Node health report is collected built based on node local state at given * instance of time @@ -211,12 +253,14 @@ struct node_health_report { node::local_state local_state; topics_t topics; std::optional drain_status; + std::optional maybe_auto_decommission_status; node_health_report( model::node_id, node::local_state, chunked_vector, - std::optional); + std::optional, + std::optional); node_health_report copy() const; @@ -240,9 +284,15 @@ struct node_health_report_serde node::local_state local_state; chunked_vector topics; std::optional drain_status; + std::optional maybe_auto_decommission_status; auto serde_fields() { - return std::tie(id, local_state, topics, drain_status); + return std::tie( + id, + local_state, + topics, + drain_status, + maybe_auto_decommission_status); } node_health_report_serde() = default; @@ -251,14 +301,22 @@ struct node_health_report_serde model::node_id id, node::local_state local_state, chunked_vector topics, - std::optional drain_status) + std::optional drain_status, + std::optional maybe_auto_decommission_status) : id(id) , local_state(std::move(local_state)) , topics(std::move(topics)) - , drain_status(drain_status) {} + , drain_status(drain_status) + , maybe_auto_decommission_status( + std::move(maybe_auto_decommission_status)) {} node_health_report_serde copy() const { - return {id, local_state, topics.copy(), drain_status}; + return { + id, + local_state, + topics.copy(), + drain_status, + maybe_auto_decommission_status}; } explicit node_health_report_serde(const node_health_report& hr); @@ -268,7 +326,8 @@ struct node_health_report_serde id, std::move(local_state), std::move(topics), - std::move(drain_status)}; + drain_status, + std::move(maybe_auto_decommission_status)}; } friend std::ostream& diff --git a/src/v/cluster/tests/health_bench.cc b/src/v/cluster/tests/health_bench.cc index 0fbfbeecc57f5..57672439bfbb8 100644 --- a/src/v/cluster/tests/health_bench.cc +++ b/src/v/cluster/tests/health_bench.cc @@ -102,7 +102,11 @@ struct health_bench : health_report_accessor { for (auto& [node_id, topics] : node_topics) { reports[node_id] = ss::make_lw_shared( - node_id, node::local_state{}, std::move(topics), std::nullopt); + node_id, + node::local_state{}, + std::move(topics), + /* drain_status */ std::nullopt, + /* auto_decommission_status */ std::nullopt); } perf_tests::start_measuring_time(); diff --git a/src/v/cluster/tests/health_monitor_bench.cc b/src/v/cluster/tests/health_monitor_bench.cc index 4143e1f462226..0bc1c0b334ebf 100644 --- a/src/v/cluster/tests/health_monitor_bench.cc +++ b/src/v/cluster/tests/health_monitor_bench.cc @@ -63,7 +63,12 @@ make_node_health_report(size_t num_topics, size_t partitions_per_topic) { topics.push_back(make_topic_status(i, partitions_per_topic)); } - return {id, local_state, std::move(topics), std::nullopt}; + return { + id, + local_state, + std::move(topics), + /* drain status */ std::nullopt, + /* auto_decommission_status */ std::nullopt}; } template diff --git a/src/v/cluster/tests/health_monitor_test.cc b/src/v/cluster/tests/health_monitor_test.cc index e91bdb0c92bcf..22392b1e6d928 100644 --- a/src/v/cluster/tests/health_monitor_test.cc +++ b/src/v/cluster/tests/health_monitor_test.cc @@ -374,7 +374,8 @@ make_nhr(int nid, const std::vector& statuses) { node_id(nid), node::local_state{}, chunked_vector(statuses.begin(), statuses.end()), - std::nullopt}; + /* drain_status */ std::nullopt, + /* maybe_auto_decommission_status */ std::nullopt}; }; struct node_and_status { @@ -441,7 +442,8 @@ FIXTURE_TEST(test_aggregate, health_report_unit) { model::node_id(0), node::local_state{}, chunked_vector{}, - std::nullopt)}}; + /* drain_status */ std::nullopt, + /* maybe_auto_decommission_status */ std::nullopt)}}; { // empty input, empty report diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index bb746a911c135..da898691f45fc 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -410,7 +410,8 @@ struct partition_balancer_planner_fixture { model::node_id(i), local_state, std::move(node_topics), - std::nullopt)); + /* drain status */ std::nullopt, + /* maybe_auto_decommission_status */ std::nullopt)); } return health_report; diff --git a/src/v/cluster/tests/partition_balancer_simulator_test.cc b/src/v/cluster/tests/partition_balancer_simulator_test.cc index 6888a885a88e6..983db77c3255b 100644 --- a/src/v/cluster/tests/partition_balancer_simulator_test.cc +++ b/src/v/cluster/tests/partition_balancer_simulator_test.cc @@ -740,7 +740,11 @@ class partition_balancer_sim_fixture { return ss::make_foreign( ss::make_lw_shared( - id, local_state, std::move(topics), std::nullopt)); + id, + local_state, + std::move(topics), + /* drain_status */ std::nullopt, + /*maybe_auto_decommission_status */ std::nullopt)); } }; diff --git a/src/v/cluster/tests/randoms.h b/src/v/cluster/tests/randoms.h index dba895ac84357..4aee366f9de14 100644 --- a/src/v/cluster/tests/randoms.h +++ b/src/v/cluster/tests/randoms.h @@ -95,7 +95,8 @@ inline node_health_report random_node_health_report() { tests::random_named_int(), node::random_local_state(), tests::random_chunked_vector(random_topic_status), - random_ds}; + random_ds, + /* maybe_auto_decommission_status */ std::nullopt}; } inline cluster_health_report random_cluster_health_report() { diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index 785edaa99087f..636affc67ad20 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -767,7 +767,8 @@ cluster::cluster_health_report random_cluster_health_report() { tests::random_named_int(), random_local_state(), std::move(topics), - random_drain_status()); + random_drain_status(), + /*maybe_auto_decommission_status*/ std::nullopt); // Reduce to an ADL-encodable state report.local_state.cache_disk = std::nullopt; @@ -1659,7 +1660,8 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { tests::random_named_int(), random_local_state(), std::move(topics), - random_drain_status()); + random_drain_status(), + /*maybe_auto_decommission_status*/ std::nullopt); // Squash local_state to a form that ADL represents, since we will // test ADL roundtrip. @@ -1676,7 +1678,8 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { tests::random_named_int(), random_local_state(), std::move(topics), - random_drain_status()); + random_drain_status(), + /*maybe_auto_decommission_status*/ std::nullopt); // Squash to ADL-understood disk state report.local_state.cache_disk = report.local_state.data_disk; From 69e9da9ecd6ebed2ee6e4264927fc96c9887f667 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Thu, 11 Dec 2025 14:57:35 -0800 Subject: [PATCH 06/11] health_montior_backend: implement auto decom Implements calculation of nodes past auto decom time in health montior backend. All nodes which are past the automatic decommission time will be added to the auto_decommission_status field in the health report. A node will be added if it was last seen prior to now - decom time or if not seen ever this nodes uptime is prior to now - decom time --- src/v/cluster/health_monitor_backend.cc | 80 ++++++++++++++++++++++++- src/v/cluster/health_monitor_backend.h | 17 ++++++ 2 files changed, 94 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 47575b9b7fb03..a2873ad8373a8 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -23,6 +23,7 @@ #include "cluster/node_status_table.h" #include "cluster/partition_manager.h" #include "cluster/partition_probe.h" +#include "cluster/types.h" #include "config/configuration.h" #include "config/property.h" #include "container/chunked_hash_map.h" @@ -48,6 +49,7 @@ #include #include +#include #include #include #include @@ -208,7 +210,7 @@ std::optional health_monitor_backend::build_node_report( it->second->local_state, {}, it->second->drain_status, - /*maybe_auto_decommission_status*/ std::nullopt}; + it->second->maybe_auto_decommission_status}; ret.local_state.logical_version = features::feature_table::get_latest_logical_version(); ret.topics = filter_topic_status(it->second->topics, f.ntp_filters); @@ -897,6 +899,7 @@ health_monitor_backend::collect_current_node_health() { auto drain_status = co_await _drain_manager.local().status(); auto topics = co_await collect_topic_status(); + auto maybe_auto_decommission_status = collect_auto_decommission_status(); auto [it, _] = _status.try_emplace(id); it->second.is_alive = alive::yes; @@ -906,8 +909,8 @@ health_monitor_backend::collect_current_node_health() { id, std::move(local_state), std::move(topics), - std::move(drain_status), - /*maybe_auto_decommission_status*/ std::nullopt}; + drain_status, + std::move(maybe_auto_decommission_status)}; } ss::future> health_monitor_backend::get_current_node_health() { @@ -1037,6 +1040,7 @@ reports_acc_t reduce_reports_map(reports_acc_t acc, shard_report shard_report) { return acc; } } // namespace + ss::future> health_monitor_backend::collect_topic_status() { auto reports_map = co_await _partition_manager.map_reduce0( @@ -1053,6 +1057,76 @@ health_monitor_backend::collect_topic_status() { co_return topics; } +namespace { +// small helper for getting the boot time of the redpanda application +ss::lowres_clock::time_point get_boot_time() { + return rpc::clock_type::now() + - std::chrono::duration_cast( + ss::engine().uptime()); +} +} // namespace + +std::optional +health_monitor_backend::collect_auto_decommission_status() { + const auto timeout_duration + = config::shard_local_cfg() + .partition_autobalancing_node_autodecommission_time(); + + auto maybe_config_version = _config->get_config(); + vassert( + maybe_config_version, + "current implementation of config_i::get_config should never return an " + "error"); + auto config_version = *maybe_config_version; + // bounce to static method + return do_collect_auto_decommission_status( + {.timeout_duration = timeout_duration, + .now = rpc::clock_type::now(), + .default_last_seen = get_boot_time(), + .nodes = _members.local().node_ids(), + .node_status_getter = get_node_status_t{[this](model::node_id node_id) { + return _node_status_table.local().get_node_status(node_id); + }}, + .current_config_version = config_version}); +} + +std::optional +health_monitor_backend::do_collect_auto_decommission_status( + const do_collect_auto_decommission_status_params& params) { + // gather the list of all nodes past the configured auto decom time + // package that with the current configuration version s.t. a decision is + // guaranteed to be made with the same decom timeout + + auto all_past_last_seen + = params.nodes | // get last seen, if missing clock min + std::ranges::views::transform( + [¶ms](const model::node_id node) -> node_status { + return params.node_status_getter(node).value_or( + node_status{ + .node_id = node, .last_seen = params.default_last_seen}); + }) + | // filter to node statuses that are above timeout duration + std::ranges::views::filter([¶ms](const node_status status) { + return params.now - status.last_seen > params.timeout_duration; + }) + | // grab the node ids + std::ranges::views::transform( + [](const node_status status) { return status.node_id; }) + | // to vector + std::ranges::to>(); + + if (all_past_last_seen.empty()) [[likely]] { + // the overwhelming majority of the time the list should be empty + return std::nullopt; + } + + auto_decommission_status status{ + {.configuration_version = params.current_config_version, + .nodes_past_auto_decom_timeout = std::move(all_past_last_seen)}}; + + return status; +} + std::chrono::milliseconds health_monitor_backend::max_metadata_age() { return config::shard_local_cfg().health_monitor_max_metadata_age(); } diff --git a/src/v/cluster/health_monitor_backend.h b/src/v/cluster/health_monitor_backend.h index 4fd27ea8e2be5..5f73cfdaca94e 100644 --- a/src/v/cluster/health_monitor_backend.h +++ b/src/v/cluster/health_monitor_backend.h @@ -18,6 +18,7 @@ #include "cluster/node_status_table.h" #include "cluster/notification.h" #include "features/feature_table.h" +#include "model/fundamental.h" #include "model/metadata.h" #include "rpc/fwd.h" #include "ssx/semaphore.h" @@ -169,6 +170,22 @@ class health_monitor_backend { ss::future> collect_topic_status(); + // get the status info of all nodes which are past auto decommission timeout + std::optional collect_auto_decommission_status(); + using get_node_status_t + = ss::noncopyable_function(model::node_id)>; + struct do_collect_auto_decommission_status_params { + std::chrono::seconds timeout_duration; + rpc::clock_type::time_point now; + rpc::clock_type::time_point default_last_seen; + std::vector nodes; + get_node_status_t node_status_getter; + config_version current_config_version; + }; + static std::optional + do_collect_auto_decommission_status( + const do_collect_auto_decommission_status_params& params); + result process_node_reply(model::node_id, result); From b0df355d27184ed5529553f116fcb39d5b0a0c56 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 15 Dec 2025 16:25:22 -0800 Subject: [PATCH 07/11] partition_balancer_state: add config_i Adds config_i to partition_balancer_state alongside wiring in the sharded service based implementation in the controller. --- src/v/cluster/controller.cc | 5 ++++- src/v/cluster/partition_balancer_state.cc | 4 +++- src/v/cluster/partition_balancer_state.h | 8 +++++++- src/v/cluster/tests/BUILD | 2 ++ src/v/cluster/tests/partition_balancer_planner_fixture.h | 4 +++- src/v/cluster/tests/topic_table_fixture.h | 4 +++- 6 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index e421f0b5b3b09..040c8b7ce5e52 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -228,7 +228,10 @@ ss::future<> controller::wire_up() { std::ref(_tp_state), std::ref(_members_table), std::ref(_partition_allocator), - std::ref(_node_status_table)); + std::ref(_node_status_table), + ss::sharded_parameter([this] { + return std::make_unique(_config_manager); + })); }) .then([this] { return _shard_placement.start( diff --git a/src/v/cluster/partition_balancer_state.cc b/src/v/cluster/partition_balancer_state.cc index f5931fbcaf5ad..7b5f7747d3b53 100644 --- a/src/v/cluster/partition_balancer_state.cc +++ b/src/v/cluster/partition_balancer_state.cc @@ -29,11 +29,13 @@ partition_balancer_state::partition_balancer_state( ss::sharded& topic_table, ss::sharded& members_table, ss::sharded& pa, - ss::sharded& nst) + ss::sharded& nst, + std::unique_ptr config) : _topic_table(topic_table.local()) , _members_table(members_table.local()) , _partition_allocator(pa.local()) , _node_status(nst.local()) + , _config(std::move(config)) , _probe(*this) {} void partition_balancer_state::handle_ntp_move_begin_or_cancel( diff --git a/src/v/cluster/partition_balancer_state.h b/src/v/cluster/partition_balancer_state.h index 89c0a25c64aa8..d188c4e8f3d1a 100644 --- a/src/v/cluster/partition_balancer_state.h +++ b/src/v/cluster/partition_balancer_state.h @@ -12,7 +12,9 @@ #include "absl/container/btree_set.h" #include "absl/container/flat_hash_set.h" +#include "cluster/config/config_i.h" #include "cluster/fwd.h" +#include "cluster/types.h" #include "metrics/metrics.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -20,6 +22,8 @@ #include +#include + namespace cluster { /// Class that stores state that is needed for functioning of the partition @@ -31,7 +35,8 @@ class partition_balancer_state { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + std::unique_ptr); topic_table& topics() const { return _topic_table; } @@ -102,6 +107,7 @@ class partition_balancer_state { members_table& _members_table; partition_allocator& _partition_allocator; node_status_table& _node_status; + std::unique_ptr _config; absl::btree_set _ntps_with_broken_rack_constraint; // revision increment to be paired with all updates // _ntps_with_broken_rack_constraint set. Relied upon by the iterator. diff --git a/src/v/cluster/tests/BUILD b/src/v/cluster/tests/BUILD index 779cf443599e4..021afa561d6b9 100644 --- a/src/v/cluster/tests/BUILD +++ b/src/v/cluster/tests/BUILD @@ -68,6 +68,7 @@ redpanda_test_cc_library( ":utils", "//src/v/base", "//src/v/cluster", + "//src/v/cluster/config/test:test_config", "//src/v/config", "//src/v/features", "//src/v/model", @@ -149,6 +150,7 @@ redpanda_test_cc_library( ":utils", "//src/v/base", "//src/v/cluster", + "//src/v/cluster/config/test:test_config", "@boost//:test", ], ) diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index da898691f45fc..6daace22e844d 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -13,6 +13,7 @@ #include "base/units.h" #include "cluster/commands.h" +#include "cluster/config/test/test_config.h" #include "cluster/data_migrated_resources.h" #include "cluster/health_monitor_types.h" #include "cluster/members_table.h" @@ -109,7 +110,8 @@ struct controller_workers { std::ref(table), std::ref(members), std::ref(allocator), - std::ref(node_status_table)) + std::ref(node_status_table), + ss::sharded_parameter(cluster::test::test_config::make_default)) .get(); } diff --git a/src/v/cluster/tests/topic_table_fixture.h b/src/v/cluster/tests/topic_table_fixture.h index a61a1cb2ec515..a4770b0016eb2 100644 --- a/src/v/cluster/tests/topic_table_fixture.h +++ b/src/v/cluster/tests/topic_table_fixture.h @@ -12,6 +12,7 @@ #pragma once #include "base/units.h" +#include "cluster/config/test/test_config.h" #include "cluster/data_migrated_resources.h" #include "cluster/members_table.h" #include "cluster/node_status_table.h" @@ -76,7 +77,8 @@ struct topic_table_fixture { std::ref(table), std::ref(members), std::ref(allocator), - std::ref(node_status)) + std::ref(node_status), + ss::sharded_parameter(cluster::test::test_config::make_default)) .get(); } From 103fbdd59362b1727750231d3f3a97972ead3186 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 15 Dec 2025 16:25:22 -0800 Subject: [PATCH 08/11] partition_balancer_state: add get_config_version adds a passthru method to get the config_version from config_i --- src/v/cluster/partition_balancer_planner.cc | 149 ++++++++++++++++++-- src/v/cluster/partition_balancer_planner.h | 19 +++ src/v/cluster/partition_balancer_state.cc | 12 ++ src/v/cluster/partition_balancer_state.h | 2 + 4 files changed, 172 insertions(+), 10 deletions(-) diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index a89a0b2a88823..326c524a8ded7 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -12,6 +12,7 @@ #include "base/vlog.h" #include "cluster/cluster_utils.h" +#include "cluster/health_monitor_types.h" #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/node_status_table.h" @@ -23,6 +24,8 @@ #include "cluster/scheduling/types.h" #include "cluster/types.h" #include "container/chunked_hash_map.h" +#include "model/fundamental.h" +#include "model/metadata.h" #include "model/namespace.h" #include "random/generators.h" #include "ssx/sformat.h" @@ -33,6 +36,8 @@ #include #include +#include +#include namespace cluster { @@ -109,6 +114,7 @@ class partition_balancer_planner::request_context { absl::flat_hash_set all_unavailable_nodes; absl::flat_hash_set timed_out_unavailable_nodes; absl::flat_hash_set decommissioning_nodes; + absl::flat_hash_set maintenance_mode_nodes; absl::flat_hash_map node_disk_reports; ss::future<> for_each_partition( @@ -326,6 +332,7 @@ void partition_balancer_planner::init_per_node_state( request_context& ctx, plan_data& result) { const auto now = rpc::clock_type::now(); + for (const auto& [id, broker] : _state.members().nodes()) { if ( broker.state.get_membership_state() @@ -341,6 +348,13 @@ void partition_balancer_planner::init_per_node_state( vlog(clusterlog.debug, "node {}: decommissioning", id); ctx.decommissioning_nodes.insert(id); } + + if ( + broker.state.get_maintenance_state() + == model::maintenance_state::active) { + ctx.maintenance_mode_nodes.insert(id); + } + auto node_status = _state.node_status().get_node_status(id); // node status is not yet available, wait for it to be updated if (!node_status) { @@ -381,13 +395,6 @@ void partition_balancer_planner::init_per_node_state( - std::chrono::duration_cast< model::timestamp_clock::duration>(time_since_last_seen)); - // get all nodes which are unresponsive enough to decom - if (time_since_last_seen > _config.decommission_timeout) { - if (!ctx.decommissioning_nodes.contains(id)) { - ctx._nodes_to_be_decommed.insert(id); - } - } - result.violations.unavailable_nodes.emplace_back( id, unavailable_since); } @@ -2149,6 +2156,124 @@ partition_balancer_planner::get_force_repair_actions(request_context& ctx) { } } +absl::flat_hash_set +partition_balancer_planner::do_get_auto_decommission_actions( + const do_get_auto_decommission_actions_params& params) noexcept { + // rule: a quorum of nodes in the cluster must vote that a node has passed + // the current auto decommission timeout + size_t quorum = params.cluster_members.size() / 2 + 1; + + // given a map from the health report on node_id -> decom_report, filter to + // 1. only cluster members + // 2. only reports that have a value + // 3. only reports that have the current configuration version + // - this checks that we are working with the current decommission + // timeout + auto decom_reports + = params.node_to_decom + | std::ranges::views::filter( + [¶ms](const auto& node_and_decom_report) { + return + // only cluster members + params.cluster_members.contains(node_and_decom_report.first) + // only reports with values + && node_and_decom_report.second.get().has_value() + // only reports with the current configuration version + && node_and_decom_report.second.get() + .value() + .configuration_version + == params.current_config; + }) + | // quick type convert from ptr> -> ref_wrapper + std::ranges::views::transform( + [](const auto& node_and_decom_report) + -> std::reference_wrapper { + return node_and_decom_report.second.get().value(); + }) + | std::ranges::to>>(); + + if (decom_reports.size() < quorum) { + // not enough health reports to continue + return {}; + } + + // bin the filtered reports on decom vote count by node_id + std::unordered_map node_to_decom_vote_counts{}; + for (const auto& decom_report : decom_reports) { + for (const model::node_id candidate_node : + decom_report.get().nodes_past_auto_decom_timeout) { + node_to_decom_vote_counts[candidate_node] += 1; + } + } + + // filter out invalid choices which are + // 1. nodes without enough votes + // 2. nodes that are already decommissioning + // 3. nodes that are in maintanence mode + // - if a customer has to take a node down for over the auto decom + // timeout and doesn't want it ejected from the cluster, they should + // put it in maintanence mode + auto nodes_to_decommission + = node_to_decom_vote_counts | // filter invalid choices + std::ranges::views::filter( + [quorum, ¶ms](const auto& node_and_count) { + const auto& [node_id, decom_vote_count] = node_and_count; + return + // no nodes with less than a quorum of votes + decom_vote_count >= quorum + // no nodes that are already decommissioning + && !params.decommissioning_nodes.contains(node_id) + // no nodes in maintenance_mode + && !params.maintenance_mode_nodes.contains(node_id); + }) + | // shave it down to only node id + std::ranges::views::transform( + [](const auto& node_and_count) { return node_and_count.first; }) + | std::ranges::to>(); + + return nodes_to_decommission; +} + +void partition_balancer_planner::get_auto_decommission_actions( + request_context& ctx, const cluster_health_report& health_report) { + absl::flat_hash_set member_nodes{}; + for (auto node : ctx.all_nodes) { + member_nodes.insert(node); + } + + auto_decom_ref_map node_to_decom{}; + for (const auto& report : health_report.node_reports) { + vlog( + clusterlog.debug, + "auto decom report from node: {}, with decom status: {}", + report->id, + report->maybe_auto_decommission_status); + node_to_decom.emplace( + report->id, report->maybe_auto_decommission_status); + } + + ctx._nodes_to_be_decommed = do_get_auto_decommission_actions( + {.node_to_decom = node_to_decom, + .cluster_members = member_nodes, + .decommissioning_nodes = ctx.decommissioning_nodes, + .maintenance_mode_nodes = ctx.maintenance_mode_nodes, + .current_config = ctx.state().get_config_version()}); + + if (ctx._nodes_to_be_decommed.empty()) { + vlog(clusterlog.debug, "no auto decommission actions found"); + return; + } + + ss::sstring auto_decommission_log_line + = "found nodes to auto decommission: "; + for (auto node_id : ctx._nodes_to_be_decommed) { + auto_decommission_log_line += fmt::format("{}, ", node_id); + } + + vlog(clusterlog.info, "{}", auto_decommission_log_line); +} + void partition_balancer_planner::request_context::collect_actions( partition_balancer_planner::plan_data& result) { result.reassignments.reserve(_reassignments.size()); @@ -2206,12 +2331,18 @@ partition_balancer_planner::plan_actions( co_return result; } + if (ctx.config().mode == model::partition_autobalancing_mode::continuous) { + get_auto_decommission_actions(ctx, health_report); + } + + // early exit if theres nothing to be done if ( result.violations.is_empty() && ctx.decommissioning_nodes.empty() && _state.ntps_with_broken_rack_constraint().empty() && _state.nodes_to_rebalance().empty() && _state.topics().partitions_to_force_recover().empty() - && !_config.ondemand_rebalance_requested) { + && !_config.ondemand_rebalance_requested + && ctx._nodes_to_be_decommed.empty()) { result.status = status::empty; co_return result; } @@ -2229,8 +2360,6 @@ partition_balancer_planner::plan_actions( change_reason::node_unavailable); co_await get_full_node_actions(ctx); co_await get_rack_constraint_repair_actions(ctx); - // get node decommission actions is already part of - // init_per_node_state } co_await get_counts_rebalancing_actions(ctx); co_await get_force_repair_actions(ctx); diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index 6aabe35eb627c..e23afc73c47ab 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -22,6 +22,7 @@ #include namespace cluster { +class partition_balancer_planner_accessor; enum ntp_reassignment_type : int8_t { regular, force }; @@ -129,13 +130,31 @@ class partition_balancer_planner { static ss::future<> get_full_node_actions(request_context&); static ss::future<> get_counts_rebalancing_actions(request_context&); static ss::future<> get_force_repair_actions(request_context&); + static void get_auto_decommission_actions( + request_context&, const cluster_health_report& health_report); static size_t calculate_full_disk_partition_move_priority( model::node_id, const reassignable_partition&, const request_context&); + using auto_decom_ref + = std::reference_wrapper>; + using auto_decom_ref_map + = absl::flat_hash_map; + struct do_get_auto_decommission_actions_params { + auto_decom_ref_map node_to_decom; + absl::flat_hash_set cluster_members; + absl::flat_hash_set decommissioning_nodes; + absl::flat_hash_set maintenance_mode_nodes; + config_version current_config; + }; + static absl::flat_hash_set do_get_auto_decommission_actions( + const do_get_auto_decommission_actions_params& params) noexcept; + planner_config _config; partition_balancer_state& _state; partition_allocator& _partition_allocator; + + friend class ::cluster::partition_balancer_planner_accessor; }; } // namespace cluster diff --git a/src/v/cluster/partition_balancer_state.cc b/src/v/cluster/partition_balancer_state.cc index 7b5f7747d3b53..adc3ec9c18af5 100644 --- a/src/v/cluster/partition_balancer_state.cc +++ b/src/v/cluster/partition_balancer_state.cc @@ -165,4 +165,16 @@ void partition_balancer_state::probe::setup_metrics( }); } +config_version partition_balancer_state::get_config_version() const { + auto maybe_config = _config->get_config(); + if (!maybe_config) { + vlog( + clusterlog.error, + "failed to get config version with error: {}", + maybe_config.error()); + } + vassert(maybe_config, "config version should not return an error"); + return *maybe_config; +} + } // namespace cluster diff --git a/src/v/cluster/partition_balancer_state.h b/src/v/cluster/partition_balancer_state.h index d188c4e8f3d1a..d95afa21ae308 100644 --- a/src/v/cluster/partition_balancer_state.h +++ b/src/v/cluster/partition_balancer_state.h @@ -91,6 +91,8 @@ class partition_balancer_state { ss::future<> apply_snapshot(const controller_snapshot&); + config_version get_config_version() const; + private: struct probe { explicit probe(const partition_balancer_state&); From e29f3008075bbbbe70a8d4b4be2834fd7fa6a229 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Wed, 17 Dec 2025 08:59:39 -0800 Subject: [PATCH 09/11] partition_balancer_autodecom_test: add unit tests Adds unit tests for the partition_balancer_planner's logic of determining when to auto decommisssion a node. Checks are: 1. check that our calculation of quorum is correct 2. dont double decommission nodes 3. dont decommission nodes in maintenance mode 4. ignore votes with a different config_version than the leader 5. ignore votes from nodes which aren't cluster members 6. support multiple decoms so long as quorum is alive --- src/v/cluster/tests/BUILD | 18 + .../partition_balancer_autodecom_test.cc | 532 ++++++++++++++++++ 2 files changed, 550 insertions(+) create mode 100644 src/v/cluster/tests/partition_balancer_autodecom_test.cc diff --git a/src/v/cluster/tests/BUILD b/src/v/cluster/tests/BUILD index 021afa561d6b9..292bd088fcd24 100644 --- a/src/v/cluster/tests/BUILD +++ b/src/v/cluster/tests/BUILD @@ -1316,3 +1316,21 @@ redpanda_cc_gtest( "@googletest//:gtest", ], ) + +redpanda_cc_gtest( + name = "partition_balancer_autodecom_test", + timeout = "short", + srcs = [ + "partition_balancer_autodecom_test.cc", + ], + deps = [ + "//src/v/cluster", + "//src/v/config", + "//src/v/container:chunked_vector", + "//src/v/model", + "//src/v/random:generators", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cluster/tests/partition_balancer_autodecom_test.cc b/src/v/cluster/tests/partition_balancer_autodecom_test.cc new file mode 100644 index 0000000000000..6deaddb467659 --- /dev/null +++ b/src/v/cluster/tests/partition_balancer_autodecom_test.cc @@ -0,0 +1,532 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/health_monitor_types.h" +#include "cluster/partition_balancer_planner.h" + +#include + +#include + +namespace cluster { + +// testing a private static function in partition_balancer_planner, we're using +// a friend class accessor to reach in +class partition_balancer_planner_accessor { + using params = cluster::partition_balancer_planner:: + do_get_auto_decommission_actions_params; + + // convenience representation of node state + // up_down: represents whether a has been down past the auto decom timeout + // node_state: relevant special node statuses that should prevent a decom + // node_and_status: packaged node state so a test scenario can be a list of + // node_id and node state + enum class up_down : uint8_t { up, down }; + enum class node_state : uint8_t { normal, maintenance, decommissioning }; + struct node_and_status { + up_down up_or_down; + node_state node_state; + model::node_id node_id; + config_version config_version; + }; + + // package a set of a params for determinining auto decommission along side + // a holder which will keep referenced memory alive + struct params_and_holders { + params params; + std::vector>> + report_memory_holder; + }; + + // translate the convenience representation to a parameters pack, and a list + // of memory holders to keep the packaged references alive + static params_and_holders + make_params(const std::vector& nodes) { + absl::flat_hash_set all_nodes{}; + absl::flat_hash_set downed_nodes{}; + absl::flat_hash_set maintenance_nodes{}; + absl::flat_hash_set decommissioning_nodes{}; + + for (const auto& node_status : nodes) { + if (node_status.up_or_down == up_down::down) { + downed_nodes.insert(node_status.node_id); + } + all_nodes.insert(node_status.node_id); + + if (node_status.node_state == node_state::maintenance) { + maintenance_nodes.insert(node_status.node_id); + } + if (node_status.node_state == node_state::decommissioning) { + decommissioning_nodes.insert(node_status.node_id); + } + // nothing for normal + } + + // keeps the memory of a given auto decom report alive + std::vector>> + memory_holder{}; + + // index map upon the living reports + partition_balancer_planner::auto_decom_ref_map decom_report_map{}; + + for (const auto& node_status : nodes) { + if (downed_nodes.contains(node_status.node_id)) { + // skip downed nodes for report generation + continue; + } + + // for living nodes, gather the bearing memory and reference list on + // their auto decom reports + auto report_holder + = std::make_unique>( + std::nullopt); + // generate an actual decom status if there are dead nodes + if (!downed_nodes.empty()) { + auto_decommission_status_data adsd{ + .configuration_version = node_status.config_version, + .nodes_past_auto_decom_timeout = std::vector( + downed_nodes.begin(), downed_nodes.end())}; + report_holder + = std::make_unique>( + std::optional{std::move(adsd)}); + } + decom_report_map.emplace( + node_status.node_id, + std::reference_wrapper(std::as_const(*report_holder))); + memory_holder.emplace_back(std::move(report_holder)); + } + + // clang keeps putting the entire list of initializers on one line + // clang-format off + return params_and_holders{ + .params + = params{ + .node_to_decom = std::move(decom_report_map), + .cluster_members = std::move(all_nodes), + .decommissioning_nodes = std::move(decommissioning_nodes), + .maintenance_mode_nodes = std::move(maintenance_nodes), + .current_config = config_version{}}, + .report_memory_holder = std::move(memory_holder)}; + // clang-format on + } + +public: + static void smoke_test() { + params smoke_params{ + .node_to_decom = {}, + .cluster_members + = {model::node_id{0}, model::node_id{2}, model::node_id{1}}, + .decommissioning_nodes = {}, + .maintenance_mode_nodes = {}, + .current_config = config_version{0}}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + smoke_params); + ASSERT_TRUE(result.empty()); + } + + static void test_normal_removal() { + // check that we can decommission a node when a majority votes that its + // past timeout + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::normal, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 1); + + ASSERT_EQ(result.begin().operator*(), model::node_id{2}); + } + + static void test_votes_no_quorum() { + // check that we don't decommission if we dont have a quorum of votes + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::normal, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + // pluck node 3's report from the reports map + params.node_to_decom.erase(model::node_id{3}); + // now there are two reports that 2 is dead, from 0 and 1 + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_ignore_incorrect_config() { + // check that we ignore votes from old configurations + config_version current_config{1}; + config_version old_config{0}; + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = current_config}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = current_config}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = old_config}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::normal, + .node_id = model::node_id{2}, + .config_version = current_config}}); + + params.current_config = current_config; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_dont_decom_twice() { + // make sure a decommissioning node doesn't get another decommissioning + // command + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::decommissioning, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_dont_decom_maintenance() { + // if a node is in maintenance mode, dont decommission it + auto [params, holder] = make_params( + {// node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = config_version{0}}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = config_version{0}}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = config_version{0}}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::maintenance, + .node_id = model::node_id{2}, + .config_version = config_version{0}}}); + params.current_config = config_version{0}; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_ignore_non_members() { + // make sure that we ignore reports from nodes that are no longer + // cluster members + config_version current_config{1}; + config_version old_config{0}; + auto [params, holder] = make_params({ + // node 0 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{0}, + .config_version = current_config}, + + // node 1 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{1}, + .config_version = current_config}, + + // node 3 up and normal + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{3}, + .config_version = old_config}, + + // node 2 down + node_and_status{ + .up_or_down = up_down::down, + .node_state = node_state::maintenance, + .node_id = model::node_id{2}, + .config_version = current_config}, + + // node will be a non-member + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{4}, + .config_version = current_config}, + + // node 5 will be a non-member + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = model::node_id{5}, + .config_version = current_config}, + + }); + params.current_config = current_config; + + // drop 4 and 5, this should leave not enough votes to remove 2 + params.cluster_members.erase(model::node_id{4}); + params.cluster_members.erase(model::node_id{5}); + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 0); + } + + static void test_multi_decom() { + // checks that given a large enough cluster, we can decom multiple nodes + // at the same time + config_version current_config{0}; + std::vector nodes_and_statuses{}; + + for (auto node_number : std::ranges::iota_view(0, 5)) { + auto node_id = model::node_id{node_number}; + nodes_and_statuses.emplace_back( + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = node_id, + .config_version = current_config}); + } + nodes_and_statuses[0].up_or_down = up_down::down; + nodes_and_statuses[1].up_or_down = up_down::down; + auto [params, holder] = make_params(nodes_and_statuses); + params.current_config = current_config; + + auto result + = partition_balancer_planner::do_get_auto_decommission_actions( + params); + ASSERT_EQ(result.size(), 2); + ASSERT_TRUE(result.contains(model::node_id{0})); + ASSERT_TRUE(result.contains(model::node_id{1})); + } + + static void test_quorum_limits() { + // we should only decom a node if a quorum agrees that the node is past + // timeout + // this test checks the limits for even & odd cluster numbers + config_version current_config{0}; + std::vector nodes_and_statuses{}; + int dead_node_number{0}; + model::node_id dead_node{dead_node_number}; + + { + // case: 5 + for (auto node_number : std::ranges::iota_view(0, 5)) { + auto node_id = model::node_id{node_number}; + nodes_and_statuses.emplace_back( + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = node_id, + .config_version = current_config}); + } + nodes_and_statuses[dead_node_number].up_or_down = up_down::down; + auto [params, holder] = make_params(nodes_and_statuses); + params.current_config = current_config; + { // all agree + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 3 agree 1 does not + params.node_to_decom.erase(model::node_id{1}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 2 agree 2 do not + params.node_to_decom.erase(model::node_id{2}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 0); + } + } + { + // case: 6 + for (auto node_number : std::ranges::iota_view(0, 6)) { + auto node_id = model::node_id{node_number}; + nodes_and_statuses.emplace_back( + node_and_status{ + .up_or_down = up_down::up, + .node_state = node_state::normal, + .node_id = node_id, + .config_version = current_config}); + } + nodes_and_statuses[dead_node_number].up_or_down = up_down::down; + auto [params, holder] = make_params(nodes_and_statuses); + params.current_config = current_config; + { // all agree + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 4 agree 1 does not + params.node_to_decom.erase(model::node_id{1}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result.contains(dead_node)); + } + { // 3 agree 2 do not + params.node_to_decom.erase(model::node_id{2}); + auto result = partition_balancer_planner:: + do_get_auto_decommission_actions(params); + ASSERT_EQ(result.size(), 0); + } + } + } +}; + +TEST(AutoDecomTestSuite, Smoke) { + partition_balancer_planner_accessor::smoke_test(); +}; +TEST(AutoDecomTestSuite, NormalRemove) { + partition_balancer_planner_accessor::test_normal_removal(); +}; +TEST(AutoDecomTestSuite, NoQuorum) { + partition_balancer_planner_accessor::test_votes_no_quorum(); +} +TEST(AutoDecomTestSuite, OldConfig) { + partition_balancer_planner_accessor::test_ignore_incorrect_config(); +} +TEST(AutoDecomTestSuite, DontDecomTwice) { + partition_balancer_planner_accessor::test_dont_decom_twice(); +} +TEST(AutoDecomTestSuite, DontDecomMaintenance) { + partition_balancer_planner_accessor::test_dont_decom_maintenance(); +} +TEST(AutoDecomTestSuite, IgnoreNonMembers) { + partition_balancer_planner_accessor::test_ignore_non_members(); +} +TEST(AutoDecomTestSuite, DecomMultipleNodes) { + partition_balancer_planner_accessor::test_multi_decom(); +} +TEST(AutoDecomTestSuite, QuorumLimits) { + partition_balancer_planner_accessor::test_quorum_limits(); +} +} // namespace cluster From 59ddbd4ac6b954622bcadfdf7a48b751493d57fc Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Wed, 17 Dec 2025 10:31:41 -0800 Subject: [PATCH 10/11] health_monitor_autodecom_test: test report gen Adds unit tests on the generation of auto decommission status reports within health_monitor_backend. Tests the following: 1. don't report when nothing is timed out 2. report when a node is past timeout and uptime is past timeout 3. default last seen to timepoint of bootup 4. config is passed through 5. ignore non-members in reporting 6. multiple nodes can be included in reports --- src/v/cluster/tests/BUILD | 19 ++ .../tests/health_monitor_autodecom_test.cc | 279 ++++++++++++++++++ 2 files changed, 298 insertions(+) create mode 100644 src/v/cluster/tests/health_monitor_autodecom_test.cc diff --git a/src/v/cluster/tests/BUILD b/src/v/cluster/tests/BUILD index 292bd088fcd24..42b7e99235a7b 100644 --- a/src/v/cluster/tests/BUILD +++ b/src/v/cluster/tests/BUILD @@ -1334,3 +1334,22 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "health_monitor_autodecom_test", + timeout = "short", + srcs = [ + "health_monitor_autodecom_test.cc", + ], + deps = [ + "//src/v/cluster", + "//src/v/config", + "//src/v/container:chunked_vector", + "//src/v/model", + "//src/v/random:generators", + "//src/v/rpc", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cluster/tests/health_monitor_autodecom_test.cc b/src/v/cluster/tests/health_monitor_autodecom_test.cc new file mode 100644 index 0000000000000..ffc1b67bca9c6 --- /dev/null +++ b/src/v/cluster/tests/health_monitor_autodecom_test.cc @@ -0,0 +1,279 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/health_monitor_backend.h" +#include "cluster/health_monitor_types.h" +#include "model/fundamental.h" +#include "rpc/types.h" + +#include + +#include +#include + +namespace cluster { + +struct health_report_accessor { + using params + = health_monitor_backend::do_collect_auto_decommission_status_params; + + static constexpr auto now = rpc::clock_type::time_point{72h}; + static constexpr auto auto_decom_timeout = 24h; + static constexpr auto timed_out_timepoint = now - auto_decom_timeout - 1h; + static constexpr auto not_timed_out_timepoint = now - 12h; + static constexpr auto boot_time_zero = rpc::clock_type::time_point{0h}; + static constexpr auto boot_time_recent = rpc::clock_type::time_point{ + not_timed_out_timepoint - 2h}; + + static params get_default_params() { + return { + .timeout_duration = auto_decom_timeout, + .now = now, + .default_last_seen = boot_time_zero, + .nodes = {}, + .node_status_getter = + [](model::node_id node_id) { + return node_status{.node_id = node_id, .last_seen = now}; + }, + .current_config_version = config_version{0} + + }; + } + + static health_monitor_backend::get_node_status_t getter_from_map( + std::unordered_map + last_seen_map) { + auto node_status_map + = std::move(last_seen_map) + | std::ranges::views::transform([](auto node_and_last_seen) { + return std::pair{ + node_and_last_seen.first, + node_status{ + .node_id = node_and_last_seen.first, + .last_seen = node_and_last_seen.second}}; + }) + | std::ranges::to< + std::unordered_map>(); + + return [node_status_map = std::move(node_status_map)]( + model::node_id node_id) -> std::optional { + auto it = node_status_map.find(node_id); + if (it == node_status_map.end()) { + return std::nullopt; + } + return it->second; + }; + } + + static std::vector members_from_map( + std::unordered_map + last_seen_map) { + auto members = std::move(last_seen_map) + | std::ranges::views::transform( + [](auto node_and_last_seen) { + return node_and_last_seen.first; + }) + | std::ranges::to>(); + return members; + } + + static std::pair< + std::vector, + health_monitor_backend::get_node_status_t> + members_and_getter_from_map( + std::unordered_map + last_seen_map) { + return { + members_from_map(last_seen_map), + getter_from_map(std::move(last_seen_map))}; + } + + // just checks that a healthy cluster spits out no decom reports + static void smoke_test() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, now}}); + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_FALSE(maybe_decom_status.has_value()); + } + + // check that we get a decom report for an expired node + static void test_timed_out() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, timed_out_timepoint}}); + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout.size(), 1); + ASSERT_TRUE( + maybe_decom_status->nodes_past_auto_decom_timeout[0] + == model::node_id{2}); + } + + // check that we refuse to decom a node if never seen when the default is + // above timeout timepoint + static void test_missing_status_no_timeout() { + std::unordered_map + node_to_last_seen = { + {model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, now}}; + + // members will have all the nodes + auto members = members_from_map(node_to_last_seen); + + // we'll be missing node status from 2 + node_to_last_seen.erase(model::node_id{2}); + auto getter = getter_from_map(node_to_last_seen); + + // construct params with a default time that is not timed out + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.default_last_seen = not_timed_out_timepoint; + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_FALSE(maybe_decom_status.has_value()); + } + + // check that we do decom a missing node if the default time provided is far + // enough back to invoke timeout + static void test_missing_status_timeout() { + std::unordered_map + node_to_last_seen = { + {model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, now}}; + + // members will have all the nodes + auto members = members_from_map(node_to_last_seen); + + // we'll be missing node status from 2 + node_to_last_seen.erase(model::node_id{2}); + auto getter = getter_from_map(node_to_last_seen); + + // construct params with a default time that is not timed out + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.default_last_seen = boot_time_zero; + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout.size(), 1); + ASSERT_TRUE( + maybe_decom_status->nodes_past_auto_decom_timeout[0] + == model::node_id{2}); + } + + // check that we're pushing the right config through + static void test_config() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, timed_out_timepoint}}); + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.current_config_version = config_version{0}; + { + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status( + params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ( + maybe_decom_status->configuration_version, config_version{0}); + } + + { + params.current_config_version = config_version{100}; + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status( + params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ( + maybe_decom_status->configuration_version, config_version{100}); + } + } + + // don't generate reports on non-cluster members + static void test_ignore_nonmembers() { + std::unordered_map + node_to_last_seen = { + {model::node_id{0}, now}, + {model::node_id{1}, now}, + {model::node_id{2}, timed_out_timepoint}}; + + // getter will have all + auto getter = getter_from_map(node_to_last_seen); + // 2 will not be a member + node_to_last_seen.erase(model::node_id{2}); + auto members = members_from_map(node_to_last_seen); + + // construct params with a default time that is not timed out + auto params = get_default_params(); + params.nodes = std::move(members); + params.node_status_getter = std::move(getter); + params.default_last_seen = boot_time_zero; + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_FALSE(maybe_decom_status.has_value()); + } + + // time out multiple nodes + static void test_multi_time_out() { + auto [members, getter] = members_and_getter_from_map( + {{model::node_id{0}, timed_out_timepoint}, + {model::node_id{1}, timed_out_timepoint}, + {model::node_id{2}, timed_out_timepoint}}); + auto params = get_default_params(); + params.nodes = members; + params.node_status_getter = std::move(getter); + + auto maybe_decom_status + = health_monitor_backend::do_collect_auto_decommission_status(params); + ASSERT_TRUE(maybe_decom_status.has_value()); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout.size(), 3); + ASSERT_EQ(maybe_decom_status->nodes_past_auto_decom_timeout, members); + } +}; + +TEST(AutoDecomTestSuite, Smoke) { health_report_accessor::smoke_test(); }; +TEST(AutoDecomTestSuite, OneTimedOut) { + health_report_accessor::test_timed_out(); +}; +TEST(AutoDecomTestSuite, MissingStatusNoTimeout) { + health_report_accessor::test_missing_status_no_timeout(); +}; +TEST(AutoDecomTestSuite, MissingStatusTimeout) { + health_report_accessor::test_missing_status_timeout(); +}; +TEST(AutoDecomTestSuite, Config) { health_report_accessor::test_config(); }; +TEST(AutoDecomTestSuite, NoNonMembers) { + health_report_accessor::test_ignore_nonmembers(); +}; +TEST(AutoDecomTestSuite, MultiTimeout) { + health_report_accessor::test_multi_time_out(); +}; +} // namespace cluster From 6773d8ea96cccf7927f0aa0c6d0f038253e85352 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Wed, 26 Nov 2025 11:03:01 -0800 Subject: [PATCH 11/11] auto_decommission_test: add auto decom integ tests Adds two tests. 1. smoke test: check that we can auto decom a node if it elapses the auto decom timeout 2. reset test: check that node restarts DO reset the timer on auto decommissioning --- tests/rptest/tests/auto_decommission_test.py | 360 +++++++++++++++++++ 1 file changed, 360 insertions(+) create mode 100644 tests/rptest/tests/auto_decommission_test.py diff --git a/tests/rptest/tests/auto_decommission_test.py b/tests/rptest/tests/auto_decommission_test.py new file mode 100644 index 0000000000000..88db9652c67fc --- /dev/null +++ b/tests/rptest/tests/auto_decommission_test.py @@ -0,0 +1,360 @@ +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import random +import time + +from ducktape.cluster.cluster import ClusterNode +from ducktape.utils.util import wait_until +from ducktape.tests.test import TestContext + +from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.kgo_verifier_services import ( + KgoVerifierMultiProducer, + KgoVerifierParams, + KgoVerifierMultiConsumerGroupConsumer, +) +from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST +from rptest.tests.prealloc_nodes import PreallocNodesTest + + +class AutoDecommissionTest(PreallocNodesTest): + """ + Test automatic node decommissioning when a node is unresponsive. + """ + + def __init__(self, test_context: TestContext): + self._topic = None + self._topics = None + + super(AutoDecommissionTest, self).__init__( + test_context=test_context, + num_brokers=5, + node_prealloc_count=2, + ) + + def setup(self): + # defer starting redpanda to test body + pass + + @property + def admin(self): + return self.redpanda._admin + + def _create_topics(self, replication_factors: list[int] = [1, 3]): + """ + :return: total number of partitions in all topics + """ + total_partitions = 0 + topics: list[TopicSpec] = [] + for enumeration in range(10): + partitions = random.randint(1, 10) + spec = TopicSpec( + name=f"topic-{enumeration}", + partition_count=partitions, + replication_factor=random.choice(replication_factors), + ) + topics.append(spec) + total_partitions += partitions + + rpk = RpkTool(self.redpanda) + for spec in topics: + rpk.create_topic( + topic=spec.name, + partitions=spec.partition_count, + replicas=spec.replication_factor, + ) + + # self._topic = random.choice(topics).name + self._topics = topics + + return total_partitions + + def _not_decommissioned_node(self, decommed_node_id: int) -> ClusterNode: + return [ + n + for n in self.redpanda.started_nodes() + if self.redpanda.node_id(n) != decommed_node_id + ][0] + + @property + def msg_size(self) -> int: + return 64 + + @property + def msg_count(self) -> int: + # test should run for ~90s, so throughput over msg size * expected runtime should yield runtime + return int(90 * self.producer_throughput / self.msg_size) + + @property + def producer_throughput(self) -> int: + # this is the total throughput for the entire producer + return 1024 + + def _get_messages_per_topic(self) -> int: + # total messages over number of topics + assert self._topics is not None, ( + "_topics list must be initialized by the time _get_messages_per_topic is called" + ) + return int(self.msg_count / len(self._topics)) + + def _get_throughput_per_topic(self) -> int: + # total throughput over number of topics + assert self._topics is not None, ( + "_topics list must be initialized by the time _get_throughput_per_topic is called" + ) + return int(self.producer_throughput / len(self._topics)) + + def _start_producer(self) -> None: + self.redpanda.logger.info( + f"starting kgo-verifier producer with expected runtime of {self.msg_count / self.producer_throughput}" + ) + assert self._topics is not None, "topics must be defined to start producer" + params = [ + KgoVerifierParams( + topic=topic, + msg_size=self.msg_size, + msg_count=self._get_messages_per_topic(), + rate_limit_bps=self._get_throughput_per_topic(), + node=self.preallocated_nodes[0], + ) + for topic in self._topics + ] + self.producer = KgoVerifierMultiProducer( + context=self.test_context, + redpanda=self.redpanda, + topics=params, + custom_node=self.preallocated_nodes[0:1], + ) + + self.producer.start() + + self.producer.wait_for_acks([10 for _ in self._topics], 15, 1) + + def _start_consumer(self) -> None: + assert self._topics is not None, "topics must be defined to start consumer" + params = [ + KgoVerifierParams( + topic=topic, + msg_size=self.msg_size, + msg_count=self._get_messages_per_topic(), + node=self.preallocated_nodes[1], + ) + for topic in self._topics + ] + self.consumer = KgoVerifierMultiConsumerGroupConsumer( + self.test_context, + self.redpanda, + topics=params, + custom_node=self.preallocated_nodes[1:], + ) + + self.consumer.start() + + def verify(self): + self.redpanda.logger.info( + f"verifying workload: topic: {self._topic}, " + + f"with [rate_limit: {self.producer_throughput}, message size: {self.msg_size}," + + f"message count: {self.msg_count}]" + ) + # let the producer and consumer finish + self.producer.wait() + self.consumer.wait() + + def start_redpanda(self, new_bootstrap: bool = True): + if new_bootstrap: + self.redpanda.set_seed_servers(self.redpanda.nodes) + + self.redpanda.start( + auto_assign_node_id=new_bootstrap, omit_seeds_on_idx_one=not new_bootstrap + ) + + @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_automatic_node_decommissioning(self): + """ + Test that a node is automatically decommissioned when it's unresponsive + for the configured timeout period. + """ + # tick < unavailable < autodecommission + partition_balancer_tick_interval_ms = 5000 + partition_balancer_unavailable_timeout_s = 15 + autodecommission_timeout_s = 30 + + # Configure partition autobalancing for auto-decommission + self.redpanda.add_extra_rp_conf( + { + "partition_autobalancing_mode": "continuous", + "partition_autobalancing_node_availability_timeout_sec": partition_balancer_unavailable_timeout_s, + "partition_autobalancing_node_autodecommission_time": autodecommission_timeout_s, + "partition_autobalancing_tick_interval_ms": partition_balancer_tick_interval_ms, + } + ) + + self.start_redpanda(new_bootstrap=True) + self._create_topics(replication_factors=[3]) + + self._start_producer() + self._start_consumer() + + # Select a random node to make unresponsive + to_decommission = random.choice(self.redpanda.nodes) + node_id = self.redpanda.node_id(to_decommission) + + self.redpanda.logger.info( + f"Stopping node {node_id} to trigger automatic decommissioning" + ) + + # Stop the node so it registers in the decom logic + self.redpanda.stop_node(node=to_decommission) + + # Wait for the timeout period plus buffer for processing + # Total wait: timeout + extra time for detection and decommission start + wait_time_sec = autodecommission_timeout_s * 4 + self.redpanda.logger.info( + f"Waiting {wait_time_sec} seconds for automatic decommissioning to trigger" + ) + + # Just make sure we're not pinging the dead node + survivor_node = self._not_decommissioned_node(node_id) + + # Verify that the node status changes to 'draining' (decommissioning) + def node_is_removed(): + try: + brokers = self.admin.get_brokers(node=survivor_node) + for b in brokers: + if b["node_id"] == node_id: + return False + return True + except Exception as e: + self.redpanda.logger.info(f"Error checking broker status: {e}") + return False + + # Decommission will remove the node, wait for it + wait_until( + node_is_removed, + timeout_sec=wait_time_sec, + backoff_sec=5, + err_msg=f"Node {node_id} was not automatically decommissioned after {wait_time_sec} seconds", + ) + + self.redpanda.logger.info(f"Node {node_id} was successfully removed") + + # Finish the producers and consumers + self.verify() + + @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_decom_timer_reset(self): + """ + Auto decommission safety requires that the timer for auto decommission resets when a node restarts + (if this weren't the case, a restart of a quorum of nodes could easily cause an early node ejection). + This test is meant to check that auto ejection DOES get delayed by a sufficient number of node restarts. + """ + # tick < unavailable < autodecommission + partition_balancer_tick_interval_ms = 5000 + partition_balancer_unavailable_timeout_s = 15 + autodecommission_timeout_s = 60 + + # Configure partition autobalancing for auto-decommission + self.redpanda.add_extra_rp_conf( + { + "partition_autobalancing_mode": "continuous", + "partition_autobalancing_node_availability_timeout_sec": partition_balancer_unavailable_timeout_s, + "partition_autobalancing_node_autodecommission_time": autodecommission_timeout_s, + "partition_autobalancing_tick_interval_ms": partition_balancer_tick_interval_ms, + } + ) + + self.start_redpanda(new_bootstrap=True) + self._create_topics(replication_factors=[3]) + + self._start_producer() + self._start_consumer() + + # Select a random node to make unresponsive + to_decommission = random.choice(self.redpanda.nodes) + to_decommission_node_id = self.redpanda.node_id(to_decommission) + + # just make sure we're not pinging the dead node + survivor_node = self._not_decommissioned_node(to_decommission_node_id) + + self.redpanda.logger.info( + f"Stopping node {to_decommission_node_id} to trigger automatic decommissioning" + ) + + # Stop the node so it decoms + self.redpanda.stop_node(node=to_decommission) + + # Sleep for half of the auto decom timeout + time.sleep(autodecommission_timeout_s / 2) + + # Restart the remaining nodes + self.redpanda.restart_nodes( + nodes=[ + node + for node in self.redpanda.nodes + if self.redpanda.node_id(node) + is not self.redpanda.node_id(to_decommission) + ], + auto_assign_node_id=True, # on restart, let the nodes resume with their prior id + ) + + # Wait for the controller to returns + controller_leader_id = self.admin.await_stable_leader( + "controller", namespace="redpanda" + ) + self.redpanda.logger.debug(f"controller leader id: {controller_leader_id}") + + # The leader balancer starts after 30s, sleep for 45 seconds at which point + # a balancer action should have happened and if the timer was genuinely reset + # , the dead node should not be eligable for decommissioning yet + time.sleep(autodecommission_timeout_s * 0.75) + + # Check that the dead broker has not been decommissioned + broker_statuses = self.admin.get_brokers(node=survivor_node) + was_decommissioned: bool = to_decommission_node_id not in [ + int(broker_status["node_id"]) for broker_status in broker_statuses + ] + assert was_decommissioned == False, ( + "a restart of the brokers should reset the autodecommission timeout" + ) + + # Wait for the remainder of the timeout to make sure it does eventually decom + wait_time_sec = autodecommission_timeout_s * 2 + self.redpanda.logger.info( + f"Waiting {wait_time_sec} seconds for automatic decommissioning to trigger" + ) + + # Verify that the node status changes to 'draining' (decommissioning) + def node_is_removed(): + try: + brokers = self.admin.get_brokers(node=survivor_node) + for b in brokers: + if b["node_id"] == to_decommission_node_id: + return False + return True + except Exception as e: + self.redpanda.logger.info(f"Error checking broker status: {e}") + return False + + # Wait for the node to be removed from the + wait_until( + node_is_removed, + timeout_sec=wait_time_sec, + backoff_sec=5, + err_msg=f"Node {to_decommission_node_id} was not automatically decommissioned after {wait_time_sec} seconds", + ) + + self.redpanda.logger.info( + f"Node {to_decommission_node_id} was automatically marked for decommissioning" + ) + + # Finish the producers and consumers + self.verify()