Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ redpanda_cc_library(
"cluster_recovery_reconciler.cc",
"cluster_recovery_table.cc",
"cluster_utils.cc",
"config/cluster_config.cc",
"config_frontend.cc",
"config_manager.cc",
"controller.cc",
Expand Down Expand Up @@ -486,6 +487,8 @@ redpanda_cc_library(
"cluster_utils.h",
"cluster_uuid.h",
"commands.h",
"config/cluster_config.h",
"config/config_i.h",
"config_frontend.h",
"config_manager.h",
"controller.h",
Expand Down
20 changes: 20 additions & 0 deletions src/v/cluster/config/cluster_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

#include "cluster/config/cluster_config.h"

#include "cluster/config_manager.h"
#include "cluster/types.h"

namespace cluster {
cluster_config::cluster_config(
ss::sharded<config_manager>& config_manager) noexcept
: _config_manager(config_manager) {}

std::expected<config_version, std::error_code>
cluster_config::get_config() const noexcept {
vassert(
ss::this_shard_id() == cluster::config_manager::shard,
"This operation can only be invoked on the config_manager shard");
return _config_manager.local().get_version();
}

} // namespace cluster
19 changes: 19 additions & 0 deletions src/v/cluster/config/cluster_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

#pragma once

#include "cluster/config/config_i.h"
#include "cluster/config_manager.h"

namespace cluster {

class cluster_config : public config_i {
public:
explicit cluster_config(
ss::sharded<config_manager>& config_manager) noexcept;
std::expected<config_version, std::error_code>
get_config() const noexcept override;

private:
ss::sharded<config_manager>& _config_manager;
};
} // namespace cluster
18 changes: 18 additions & 0 deletions src/v/cluster/config/config_i.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

#pragma once

#include "cluster/types.h"

#include <expected>

namespace cluster {

// interface for cluster configuration
class config_i {
public:
virtual std::expected<config_version, std::error_code>
get_config() const noexcept = 0;

virtual ~config_i() = default;
};
} // namespace cluster
12 changes: 12 additions & 0 deletions src/v/cluster/config/test/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("//bazel:build.bzl", "redpanda_cc_library")

redpanda_cc_library(
name = "test_config",
hdrs = [
"test_config.h",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/cluster",
],
)
31 changes: 31 additions & 0 deletions src/v/cluster/config/test/test_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

#pragma once

#include "cluster/config/config_i.h"
#include "cluster/types.h"

namespace cluster::test {
class test_config : public config_i {
public:
using functor_t
= std::function<std::expected<config_version, std::error_code>()>;

explicit test_config(functor_t function) noexcept
: _function(std::move(function)) {}

std::expected<config_version, std::error_code>
get_config() const noexcept override {
return _function();
}

static inline std::unique_ptr<config_i> make_default() {
return std::make_unique<test_config>([] {
return std::expected<config_version, std::error_code>{
config_version{0}};
});
}

private:
functor_t _function;
};
} // namespace cluster::test
14 changes: 12 additions & 2 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "cluster/cluster_link/table.h"
#include "cluster/cluster_recovery_table.h"
#include "cluster/cluster_utils.h"
#include "cluster/config/cluster_config.h"
#include "cluster/config_frontend.h"
#include "cluster/controller_api.h"
#include "cluster/controller_backend.h"
Expand Down Expand Up @@ -227,7 +228,10 @@ ss::future<> controller::wire_up() {
std::ref(_tp_state),
std::ref(_members_table),
std::ref(_partition_allocator),
std::ref(_node_status_table));
std::ref(_node_status_table),
ss::sharded_parameter([this] {
return std::make_unique<cluster_config>(_config_manager);
}));
})
.then([this] {
return _shard_placement.start(
Expand Down Expand Up @@ -727,7 +731,11 @@ ss::future<> controller::start(
std::ref(_drain_manager),
std::ref(_feature_table),
std::ref(_partition_leaders),
std::ref(_tp_state));
std::ref(_tp_state),
std::ref(_node_status_table),
ss::sharded_parameter([this]() {
return std::make_unique<cluster_config>(_config_manager);
}));

_leader_balancer = std::make_unique<leader_balancer>(
_tp_state.local(),
Expand Down Expand Up @@ -798,6 +806,8 @@ ss::future<> controller::start(
std::ref(_members_frontend),
config::shard_local_cfg()
.partition_autobalancing_node_availability_timeout_sec.bind(),
config::shard_local_cfg()
.partition_autobalancing_node_autodecommission_time.bind(),
config::shard_local_cfg()
.partition_autobalancing_max_disk_usage_percent.bind(),
config::shard_local_cfg().partition_autobalancing_tick_interval_ms.bind(),
Expand Down
93 changes: 90 additions & 3 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/node/local_monitor.h"
#include "cluster/node_status_table.h"
#include "cluster/partition_manager.h"
#include "cluster/partition_probe.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
Expand All @@ -47,6 +49,7 @@
#include <fmt/ranges.h>

#include <algorithm>
#include <chrono>
#include <iterator>
#include <optional>
#include <ranges>
Expand All @@ -67,7 +70,9 @@ health_monitor_backend::health_monitor_backend(
ss::sharded<drain_manager>& drain_manager,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<partition_leaders_table>& partition_leaders_table,
ss::sharded<topic_table>& topic_table)
ss::sharded<topic_table>& topic_table,
ss::sharded<node_status_table>& node_status_table,
std::unique_ptr<config_i> config)
: _raft0(std::move(raft0))
, _members(mt)
, _connections(connections)
Expand All @@ -78,6 +83,8 @@ health_monitor_backend::health_monitor_backend(
, _feature_table(feature_table)
, _partition_leaders_table(partition_leaders_table)
, _topic_table(topic_table)
, _node_status_table(node_status_table)
, _config(std::move(config))
, _reports{ss::make_lw_shared<report_cache_t>()}
, _local_monitor(local_monitor)
, _self(_raft0->self().id()) {}
Expand Down Expand Up @@ -199,7 +206,11 @@ std::optional<node_health_report_ptr> health_monitor_backend::build_node_report(
}

node_health_report ret{
it->second->id, it->second->local_state, {}, it->second->drain_status};
it->second->id,
it->second->local_state,
{},
it->second->drain_status,
it->second->maybe_auto_decommission_status};
ret.local_state.logical_version
= features::feature_table::get_latest_logical_version();
ret.topics = filter_topic_status(it->second->topics, f.ntp_filters);
Expand Down Expand Up @@ -888,13 +899,18 @@ health_monitor_backend::collect_current_node_health() {

auto drain_status = co_await _drain_manager.local().status();
auto topics = co_await collect_topic_status();
auto maybe_auto_decommission_status = collect_auto_decommission_status();

auto [it, _] = _status.try_emplace(id);
it->second.is_alive = alive::yes;
it->second.last_reply_timestamp = ss::lowres_clock::now();

co_return node_health_report{
id, std::move(local_state), std::move(topics), std::move(drain_status)};
id,
std::move(local_state),
std::move(topics),
drain_status,
std::move(maybe_auto_decommission_status)};
}
ss::future<result<node_health_report_ptr>>
health_monitor_backend::get_current_node_health() {
Expand Down Expand Up @@ -1024,6 +1040,7 @@ reports_acc_t reduce_reports_map(reports_acc_t acc, shard_report shard_report) {
return acc;
}
} // namespace

ss::future<chunked_vector<topic_status>>
health_monitor_backend::collect_topic_status() {
auto reports_map = co_await _partition_manager.map_reduce0(
Expand All @@ -1040,6 +1057,76 @@ health_monitor_backend::collect_topic_status() {
co_return topics;
}

namespace {
// small helper for getting the boot time of the redpanda application
ss::lowres_clock::time_point get_boot_time() {
return rpc::clock_type::now()
- std::chrono::duration_cast<std::chrono::seconds>(
ss::engine().uptime());
}
} // namespace

std::optional<auto_decommission_status>
health_monitor_backend::collect_auto_decommission_status() {
const auto timeout_duration
= config::shard_local_cfg()
.partition_autobalancing_node_autodecommission_time();

auto maybe_config_version = _config->get_config();
vassert(
maybe_config_version,
"current implementation of config_i::get_config should never return an "
"error");
auto config_version = *maybe_config_version;
// bounce to static method
return do_collect_auto_decommission_status(
{.timeout_duration = timeout_duration,
.now = rpc::clock_type::now(),
.default_last_seen = get_boot_time(),
.nodes = _members.local().node_ids(),
.node_status_getter = get_node_status_t{[this](model::node_id node_id) {
return _node_status_table.local().get_node_status(node_id);
}},
.current_config_version = config_version});
}

std::optional<auto_decommission_status>
health_monitor_backend::do_collect_auto_decommission_status(
const do_collect_auto_decommission_status_params& params) {
// gather the list of all nodes past the configured auto decom time
// package that with the current configuration version s.t. a decision is
// guaranteed to be made with the same decom timeout

auto all_past_last_seen
= params.nodes | // get last seen, if missing clock min
std::ranges::views::transform(
[&params](const model::node_id node) -> node_status {
return params.node_status_getter(node).value_or(
node_status{
.node_id = node, .last_seen = params.default_last_seen});
})
| // filter to node statuses that are above timeout duration
std::ranges::views::filter([&params](const node_status status) {
return params.now - status.last_seen > params.timeout_duration;
})
| // grab the node ids
std::ranges::views::transform(
[](const node_status status) { return status.node_id; })
| // to vector
std::ranges::to<std::vector<model::node_id>>();

if (all_past_last_seen.empty()) [[likely]] {
// the overwhelming majority of the time the list should be empty
return std::nullopt;
}

auto_decommission_status status{
{.configuration_version = params.current_config_version,
.nodes_past_auto_decom_timeout = std::move(all_past_last_seen)}};

return status;
}

std::chrono::milliseconds health_monitor_backend::max_metadata_age() {
return config::shard_local_cfg().health_monitor_max_metadata_age();
}
Expand Down
25 changes: 24 additions & 1 deletion src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
*/
#pragma once
#include "absl/container/node_hash_map.h"
#include "cluster/config/config_i.h"
#include "cluster/fwd.h"
#include "cluster/health_monitor_types.h"
#include "cluster/node/local_monitor.h"
#include "cluster/node_status_table.h"
#include "cluster/notification.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "rpc/fwd.h"
#include "ssx/semaphore.h"
Expand Down Expand Up @@ -71,7 +74,9 @@ class health_monitor_backend {
ss::sharded<drain_manager>&,
ss::sharded<features::feature_table>&,
ss::sharded<partition_leaders_table>&,
ss::sharded<topic_table>&);
ss::sharded<topic_table>&,
ss::sharded<node_status_table>&,
std::unique_ptr<config_i>);

ss::future<> stop();

Expand Down Expand Up @@ -165,6 +170,22 @@ class health_monitor_backend {

ss::future<chunked_vector<topic_status>> collect_topic_status();

// get the status info of all nodes which are past auto decommission timeout
std::optional<auto_decommission_status> collect_auto_decommission_status();
using get_node_status_t
= ss::noncopyable_function<std::optional<node_status>(model::node_id)>;
struct do_collect_auto_decommission_status_params {
std::chrono::seconds timeout_duration;
rpc::clock_type::time_point now;
rpc::clock_type::time_point default_last_seen;
std::vector<model::node_id> nodes;
get_node_status_t node_status_getter;
config_version current_config_version;
};
static std::optional<auto_decommission_status>
do_collect_auto_decommission_status(
const do_collect_auto_decommission_status_params& params);

result<node_health_report>
process_node_reply(model::node_id, result<get_node_health_reply>);

Expand Down Expand Up @@ -232,6 +253,8 @@ class health_monitor_backend {
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<partition_leaders_table>& _partition_leaders_table;
ss::sharded<topic_table>& _topic_table;
ss::sharded<node_status_table>& _node_status_table;
std::unique_ptr<config_i> _config;

ss::lowres_clock::time_point _last_refresh;
ss::lw_shared_ptr<abortable_refresh_request> _refresh_request;
Expand Down
Loading
Loading