replica_pinning: add priority-based replica pinning#10
Open
joe-redpanda wants to merge 16 commits intodevfrom
Open
replica_pinning: add priority-based replica pinning#10joe-redpanda wants to merge 16 commits intodevfrom
joe-redpanda wants to merge 16 commits intodevfrom
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf CI / validate (pull_request).
|
1ba5a68 to
a62539b
Compare
joe-redpanda
pushed a commit
that referenced
this pull request
Apr 20, 2026
Taking an r-value reference to `req` in `check_ntp_states_locally()` could
result in a heap-use-after-free [1] due to the fact `req` is not `std::move()`'d
within the function, and `check_ntp_states_locally()` is being called via
continuation in `backend::send_rpc()`.
Change `check_ntp_states_locally()` to take `req` by value instead.
[1]:
```
#0 0xaaaad89d8a18 in chunked_vector<cluster::data_migrations::data_migration_ntp_state>::iter<false>::check_generation() const bazel-out/aarch64-dbg/bin/src/v/container/_virtual_includes/chunked_vector/container/chunked_vector.h:474:13
#1 0xaaaad89d8a18 in chunked_vector<cluster::data_migrations::data_migration_ntp_state>::iter<false>::operator==(chunked_vector<cluster::data_migrations::data_migration_ntp_state>::iter<false> const&) const bazel-out/aarch64-dbg/bin/src/v/container/_virtual_includes/chunked_vector/container/chunked_vector.h:439:13
#2 0xaaaad88ff614 in seastar::future<void> ssx::detail::async_for_each_coro<ssx::async_algo_traits, ssx::detail::internal_counter, cluster::data_migrations::backend::check_ntp_states_locally(cluster::data_migrations::check_ntp_states_request&&)::$_0, chunked_vector<cluster::data_migrations::data_migration_ntp_state>::iter<false>, chunked_vector<cluster::data_migrations::data_migration_ntp_state>::iter<false>>(ssx::detail::internal_counter, T2, chunked_vector<cluster::data_migrations::data_migration_ntp_state>::iter<false>, cluster::data_migrations::backend::check_ntp_states_locally(cluster::data_migrations::check_ntp_states_request&&)::$_0) (.resume) bazel-out/aarch64-dbg/bin/src/v/ssx/_virtual_includes/async_algorithm/ssx/async_algorithm.h:148:20
#3 0xaaaace22a4dc in std::__1::coroutine_handle<seastar::internal::coroutine_traits_base<void>::promise_type>::resume[abi:se200100]() const external/toolchains_llvm++llvm+current_llvm_toolchain/bin/../../toolchains_llvm++llvm+current_llvm_toolchain_llvm/bin/../include/c++/v1/__coroutine/coroutine_handle.h:144:5
#4 0xaaaace22a4dc in seastar::internal::coroutine_traits_base<void>::promise_type::run_and_dispose() external/+non_module_dependencies+seastar/include/seastar/core/coroutine.hh:171:20
#5 0xaaaadde45c2c in seastar::reactor::task_queue::run_tasks() external/+non_module_dependencies+seastar/src/core/reactor.cc:2747:14
#6 0xaaaadde4c594 in seastar::reactor::task_queue_group::run_tasks() external/+non_module_dependencies+seastar/src/core/reactor.cc:3251:27
#7 0xaaaadde4c034 in seastar::reactor::task_queue_group::run_some_tasks() external/+non_module_dependencies+seastar/src/core/reactor.cc:3235:5
#8 0xaaaadde4dcac in seastar::reactor::do_run() external/+non_module_dependencies+seastar/src/core/reactor.cc:3418:20
#9 0xaaaadde4cc44 in seastar::reactor::run() external/+non_module_dependencies+seastar/src/core/reactor.cc:3295:16
#10 0xaaaaddc381d4 in seastar::app_template::run_deprecated(int, char**, std::__1::function<void ()>&&) external/+non_module_dependencies+seastar/src/core/app-template.cc:266:31
redpanda-data#11 0xaaaaddc36a4c in seastar::app_template::run(int, char**, std::__1::function<seastar::future<int> ()>&&) external/+non_module_dependencies+seastar/src/core/app-template.cc:160:12
redpanda-data#12 0xaaaace07c3b8 in application::run(int, char**) src/v/redpanda/application.cc:312:16
redpanda-data#13 0xaaaace034788 in main src/v/redpanda/main.cc:22:16
redpanda-data#14 0xffffaf0073f8 (/opt/redpanda_installs/ci/lib/libc.so.6+0x273f8) (BuildId: 2a450fe74d1b79a321cc1b12337fc31a2c3fb834)
redpanda-data#15 0xffffaf0074c8 in __libc_start_main (/opt/redpanda_installs/ci/lib/libc.so.6+0x274c8) (BuildId: 2a450fe74d1b79a321cc1b12337fc31a2c3fb834)
redpanda-data#16 0xaaaacdf503ec in _start (/opt/redpanda_installs/ci/libexec/redpanda+0x1a3e03ec) (BuildId: 52528f0683dceb3bfb7940a4869a87f6)
```
0440f0b to
eef9e4b
Compare
Define the config::replicas_preference type with CSR-style storage
(flat rack_ids array + group_offsets index) for priority-ordered rack
groups. Add parsing for the "racks: A, {B, C}, D" format, serde,
format_to, and group_index_for() lookup. Integrate into topic_properties
as an optional field, bumping serde from v13 to v14, and into
incremental_topic_updates for in-place alter, bumping its serde from
v10 to v11.
The CSR layout is cache-friendly and deterministic for serialization,
beating hash map overhead at the typical cardinality (<30 racks).
Wire replicas_preference through the Kafka protocol layer: create, alter, incremental alter, and describe configs handlers. Add enterprise license gating via a validator in both alter paths (blocks setting racks-type preference without a valid license). Add to cluster_link as a disallowed property (rack topology is cluster-local), and add a feature_manager report hook driven by scanning topic_table for any topic with a non-default preference set. No cluster-wide default — this is strictly a per-topic property. describe_configs uses the no-synonym overload.
Implement the replica_pinning_preferred soft constraint as a pure priority scorer: group k scores max_score/(k+1), rackless and unpreferred nodes score 0. Insert at L1 in the constraint hierarchy between rack diversity (L0) and topic count balance (L2). Add balancer repair: the planner scans all topics with replicas_preference set, detects violations by comparing actual group assignment against the fill-then-overflow ideal, and moves replicas from lower-priority to higher-priority racks. Violation detection is on-demand during the planner tick rather than eagerly maintained in partition_balancer_state. Capacity is rack-awareness-aware: counts distinct racks per group when ON, total nodes when OFF — this prevents the repair logic from flagging violations that the allocator's rack diversity constraint (L0) cannot resolve. A per-term last_pinning_violations_count is exposed on the backend for diagnostics.
eef9e4b to
35411e9
Compare
Add C++ test coverage for replica pinning: Planner tests: repair moves replicas to higher-priority racks, violation detection, no-op when optimal, multiple violations, and fill-then-overflow tests for rack awareness ON/OFF with varying capacity scenarios. Scoring tests (separate file): constraint scoring verification (group 0 = max_score, group k = max_score/(k+1), same group equality, unpreferred = 0, rackless = 0). Simulator tests (separate file): end-to-end balancer convergence with rack awareness ON and OFF variants. Extract sim fixture to a shared header so the pinning sim and the existing simulator test share the same harness. Topic properties tests: serde roundtrip with/without replicas_preference, backward compatibility v13 -> v14.
Add ducktape integration tests covering the full acceptance criteria:
1. Pinning with rack awareness — preferred racks chosen, one per rack
2. Pinning without rack awareness — all replicas concentrate in top rack
3. Priority ordering — higher-priority racks chosen first
4. Group notation — {B,C,D} get roughly equal counts (statistical)
5. Fallback to unpreferred — graceful placement outside preference list
6. Balancer repair — replicas promoted back after node recovery
7. Alter/remove preference — changing triggers rebalance, removing stops
8. Rackless nodes — deprioritized, not selected when preferred available
9a. Spread within rack (RA ON) — replicas distributed across nodes
9b. Spread within rack (RA OFF) — concentrated replicas still spread
Statistical tests use a 5% jitter threshold for derived thresholds to
account for internal topic replica jitter.
Introduces an empty btree_set tracking topics with replicas_preference set, plus a lazy seed method. Wiring to topic_table delta notifications comes in a follow-up commit. Planner integration is separate.
35411e9 to
aa1bd2a
Compare
Adds a self-contained gtest fixture (the existing topic_table and planner fixtures are boost-only) and a first test covering the seed path before any topic_table notifications arrive.
Registers a topic_table delta notification on shard 0. On added and properties_updated, the metadata is re-read and the cache set/unset accordingly. On removed, the topic is dropped. The cache is only updated after initial seeding — deltas before seeding are ignored because the upcoming seed scan will observe the authoritative state.
Matches the invalidation pattern used by leader_balancer for its _last_seen_preferences map. Guards against any drift that could have accumulated while this node was not the controller leader.
The replica-pinning repair loop previously scanned every topic in topic_table each tick and filtered out topics without a preference set. With partition_balancer_state now tracking pinned topics incrementally, the loop iterates that cache directly — O(pinned) instead of O(topics). No behavior change — existing planner tests continue to cover the repair path end-to-end.
apply_snapshot rebuilds the topic_table from scratch, so any deltas the pinning-cache notification callback observed before the snapshot are no longer authoritative Call reset_pinning_cache() alongside the existing _ntps_with_broken_rack_constraint reset so the next planner tick re-seeds from the rebuilt topic_table.
Splits pinning to occur in two phases to avoid heavy calculations before early exit. replica pinning now occurs in a first phase which detects violations and a second phase which reallocates the replicas.
Tests on early exit and violations count
Mirrors the new license_required_feature::replica_pinning entry added to the C++ enum. Without this, every parameterization of test_enable_features fails with KeyError when the server's feature report includes "replica_pinning" (Feature[f["name"]] lookup fails). Unlike other features, replica_pinning has no cluster-wide config -- it is activated solely via the per-topic Kafka property redpanda.replicas.preference. The license gate lives in topics_frontend::do_create_topic, so rpk surfaces a RpkException rather than the Admin API's HTTPError. The expect_exception block accepts either exception type, and for RpkException the validator matches the generic "Enterprise ... license" rejection wording that comes from features::enterprise_error_message::topic_property.
The new topic property is emitted by the describe handler, so test_describe_topics_with_documentation_and_types fails on the "assert name in properties" check while parsing the docs section.
aa1bd2a to
6cc412b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
replicas_preferenceper-topic property with priority-ordered rack groups (racks: A, {B, C}, D) using CSR-style storage for cache-friendly lookupTest plan
bazel test //src/v/config/tests:replicas_preference_test— parsing, serde, CSR internals, edge casesbazel test //src/v/cluster/tests:partition_balancer_planner_test— constraint scoring, repair behavior, violation tracking (10 new tests)bazel test //src/v/cluster/tests:partition_balancer_simulator_test— end-to-end balancer convergencebazel test //src/v/cluster/tests:topic_properties_test— serde roundtrip, backward compat v13->v14bazel test //src/v/kafka/server/tests:alter_config_test— Kafka API wiring