Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
504b41d
replica_pinning: add replicas_preference type and topic property
joe-redpanda Apr 20, 2026
78e69f3
replica_pinning: wire Kafka API and enterprise feature gating
joe-redpanda Apr 20, 2026
106afe2
replica_pinning: add allocation constraint and balancer integration
joe-redpanda Apr 20, 2026
5677688
replica_pinning: add C++ unit and deterministic tests
joe-redpanda Apr 20, 2026
128cd59
replica_pinning: add rptest integration tests
joe-redpanda Apr 20, 2026
3324279
partition_balancer_state: add replica-pinning topic cache scaffolding
joe-redpanda Apr 20, 2026
b5247bd
partition_balancer_state: test pinning cache seeds on demand
joe-redpanda Apr 20, 2026
dd62e96
partition_balancer_state: maintain pinning cache via topic deltas
joe-redpanda Apr 20, 2026
bc9a78d
partition_balancer_state: test pinning cache drops deleted topics
joe-redpanda Apr 20, 2026
227f624
partition_balancer_backend: reset pinning cache on raft0 term change
joe-redpanda Apr 20, 2026
48ef537
partition_balancer_planner: iterate replica-pinning cache only
joe-redpanda Apr 20, 2026
e4291da
partition_balancer_state: reset pinning cache on snapshot apply
joe-redpanda Apr 20, 2026
e36f608
partition_balancer_planner: two phase pinning
joe-redpanda Apr 21, 2026
8814879
partition_balancer_planner_test: add early exit test
joe-redpanda Apr 21, 2026
0634ee6
tests: add replica_pinning to enterprise features test
joe-redpanda Apr 22, 2026
6cc412b
tests: add redpanda.replicas.preference to describe_topics expected set
joe-redpanda Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
std::nullopt,
std::nullopt,
model::redpanda_storage_mode::tiered,
std::nullopt,
};

auto random_initial_revision_id
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ redpanda_cc_library(
deps = [
"//src/v/base",
"//src/v/cloud_storage:remote_label",
"//src/v/config",
"//src/v/model",
"//src/v/pandaproxy/schema_registry:subject_name_strategy",
"//src/v/reflection:adl",
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ ss::future<> controller::stop() {
co_await _data_migrated_resources.stop();
co_await _roles.stop();
co_await _credentials.stop();
co_await _partition_balancer_state.stop();
co_await _tp_state.stop();
co_await _members_manager.stop();
co_await _epoch_service.stop();
Expand Down
17 changes: 17 additions & 0 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "config/configuration.h"
#include "config/endpoint_tls_config.h"
#include "config/node_config.h"
#include "config/replicas_preference.h"
#include "config/sasl_mechanisms.h"
#include "config/tls_config.h"
#include "config/types.h"
Expand Down Expand Up @@ -273,6 +274,22 @@ feature_manager::report_enterprise_features() const {
report.set(
features::license_required_feature::leadership_pinning,
leadership_pinning_enabled());
auto replica_pinning_enabled = [this] {
const config::replicas_preference no_replicas_preference{};
for (const auto& topic : _topic_table.local().topics_map()) {
const auto replicas_preference
= topic.second.get_configuration().properties.replicas_preference;
if (
replicas_preference.has_value()
&& replicas_preference.value() != no_replicas_preference) {
return true;
}
}
return false;
};
report.set(
features::license_required_feature::replica_pinning,
replica_pinning_enabled());
report.set(
features::license_required_feature::shadow_linking,
cfg.enable_shadow_linking());
Expand Down
17 changes: 11 additions & 6 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ bool partition_balancer_backend::is_enabled() const {
return is_leader() && !config::node().recovery_mode_enabled();
}

void partition_balancer_backend::maybe_advance_term() {
if (!_cur_term || _raft0->term() != _cur_term->id) {
_cur_term = per_term_state(_raft0->term());
_state.reset_pinning_cache();
}
}

void partition_balancer_backend::start() {
_topic_table_updates = _state.topics().register_lw_ntp_notification(
[this]() { on_topic_table_update(); });
Expand Down Expand Up @@ -137,9 +144,7 @@ ss::future<std::error_code> partition_balancer_backend::request_rebalance() {
co_return errc::not_leader;
}

if (!_cur_term || _raft0->term() != _cur_term->id) {
_cur_term = per_term_state(_raft0->term());
}
maybe_advance_term();

if (
_cur_term->_ondemand_rebalance_requested
Expand Down Expand Up @@ -359,9 +364,7 @@ ss::future<> partition_balancer_backend::do_tick() {

vlog(clusterlog.debug, "tick");

if (!_cur_term || _raft0->term() != _cur_term->id) {
_cur_term = per_term_state(_raft0->term());
}
maybe_advance_term();

co_await _controller_stm.wait(
_raft0->committed_offset(),
Expand Down Expand Up @@ -448,6 +451,8 @@ ss::future<> partition_balancer_backend::do_tick() {

_cur_term->last_tick_time = clock_t::now();
_cur_term->last_violations = std::move(plan_data.violations);
_cur_term->last_pinning_violations_count
= plan_data.last_pinning_violations_count;
_cur_term->last_tick_reallocation_failures = std::move(
plan_data.reallocation_failures);
if (
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/partition_balancer_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ class partition_balancer_backend {

partition_balancer_overview_reply overview() const;

// Pinning-violation count observed during the most recent planning tick
// of the current leader term. Zero if no tick has run yet for this term.
// TODO: expose through partition_balancer_overview_reply (serde version
// bump) once we're ready to make this visible via the admin API.
size_t last_pinning_violations_count() const {
return _cur_term ? _cur_term->last_pinning_violations_count : 0;
}

ss::future<std::error_code> request_rebalance();

private:
Expand All @@ -84,6 +92,11 @@ class partition_balancer_backend {
/// If now, rearms to run immediately, else rearms to _tick_interval or
/// current timeout whichever is minimum.
void maybe_rearm_timer(bool now = false);

/// If the raft0 term has advanced since we last observed it, reset
/// per-term state. Must be called on shard 0 before any code that
/// relies on _cur_term or on per-term caches in partition_balancer_state.
void maybe_advance_term();
void on_members_update(model::node_id, model::membership_state);
void on_topic_table_update();
void on_health_monitor_update(
Expand Down Expand Up @@ -144,6 +157,7 @@ class partition_balancer_backend {
partition_balancer_status last_status
= partition_balancer_status::starting;
size_t last_tick_in_progress_updates = 0;
size_t last_pinning_violations_count = 0;

chunked_hash_map<model::ntp, reallocation_failure_details>
last_tick_reallocation_failures;
Expand Down
Loading
Loading