Skip to content
47 changes: 46 additions & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
notify_decorators/3,
spawn_notify_decorators/3]).

-export([get_member_with_highest_index/3,
get_member_with_highest_index/4]).

-export([is_enabled/0,
is_compatible/3,
declare/2,
Expand Down Expand Up @@ -1245,7 +1248,7 @@ key_metrics_rpc(ServerId) ->
Metrics = ra:key_metrics(ServerId),
Metrics#{machine_version => rabbit_fifo:version()}.

-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
-spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
[[{binary(), term()}]] | {error, term()}.
status(Vhost, QueueName) ->
%% Handle not found queues
Expand Down Expand Up @@ -1335,6 +1338,48 @@ get_sys_status(Proc) ->

end.

-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) ->
[[{binary(), term()}]] | {error, term()}.
get_member_with_highest_index(Vhost, QueueName, IndexName) ->
get_member_with_highest_index(Vhost, QueueName, IndexName, false).

-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom(), boolean()) ->
[[{binary(), term()}]] | {error, term()}.
get_member_with_highest_index(Vhost, QueueName, IndexName, IncludeOfflineMembers) ->
case ?MODULE:status(Vhost, QueueName) of
Status when is_list(Status) ->
IndexNameInternal = rabbit_data_coercion:to_atom(IndexName),
case index_name_to_status_key(IndexNameInternal) of
Key when is_binary(Key) ->
{_HighestIndexValue, HighestEntry} =
lists:foldl(
fun(Entry, {PreviousIndexValue, _PreviousEntry} = Acc) ->
State = rabbit_misc:pget(<<"Raft State">>, Entry),
case {rabbit_misc:pget(Key, Entry), IncludeOfflineMembers} of
{CurrentIndexValue, false} when is_integer(CurrentIndexValue),
CurrentIndexValue > PreviousIndexValue,
State /= noproc ->
{CurrentIndexValue, Entry};
{CurrentIndexValue, true} when is_integer(CurrentIndexValue),
CurrentIndexValue > PreviousIndexValue ->
{CurrentIndexValue, Entry};
_ ->
Acc
end
end, {-100, []}, Status),
Copy link
Contributor Author

@Ayanda-D Ayanda-D Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI, the -100 here is just an arbitrary number to initialise the accumulator, to ensure its less than the initial index value of -1, e.g. initial snapshot index = -1: https://github.com/rabbitmq/ra/blob/v2.16.11/src/ra_snapshot.erl#L195

[HighestEntry];
undefined ->
[]
end;
{error, _} = Error ->
Error
end.

index_name_to_status_key(I) when I =:= commit; I =:= commit_index -> <<"Commit Index">>;
index_name_to_status_key(I) when I =:= log; I =:= log_index -> <<"Last Log Index">>;
index_name_to_status_key(I) when I =:= snapshot; I =:= snapshot_index -> <<"Snapshot Index">>;
index_name_to_status_key(_I) -> undefined.

add_member(VHost, Name, Node, Membership, Timeout)
when is_binary(VHost) andalso
is_binary(Name) andalso
Expand Down
109 changes: 108 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ groups() ->
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down,
subscribe_from_each
subscribe_from_each,
get_member_with_highest_index


]},
Expand Down Expand Up @@ -365,6 +366,8 @@ init_per_testcase(Testcase, Config) ->
{skip, "peek_with_wrong_queue_type isn't mixed versions compatible"};
cancel_consumer_gh_3729 when IsMixed andalso RabbitMQ3 ->
{skip, "this test is not compatible with RabbitMQ 3.13.x"};
get_member_with_highest_index when IsMixed ->
{skip, "get_member_with_highest_index isn't mixed versions compatible"};
_ ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Expand Down Expand Up @@ -4576,6 +4579,110 @@ leader_health_check(Config) ->
amqp_connection:close(Conn1),
amqp_connection:close(Conn2).

get_member_with_highest_index(Config) ->
[Node1, Node2, Node3, Node4, Node5] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Q = ?config(queue_name, Config),
VHost = <<"/">>,

Statuses =
%% [{Node, Member, LogIdx, CommitIdx, SnapshotIdx}, ...]
[{Node1, leader, 1015, 1010, 1010}, %% highest SnapshotIdx
{Node2, follower, 1015, 1010, 1010}, %% highest SnapshotIdx (duplicate)
{Node3, follower, 1013, 1013, 1009}, %% highest CommitIdx
{Node4, follower, 1016, 1009, undefined}, %% highest LogIdx
{Node5, noproc, 1050, 1050, 1050}], %% highest but noproc

Term = 1,
MachineVersion = 7,

meck:new(rabbit_quorum_queue, [passthrough, no_link]),
meck:expect(
rabbit_quorum_queue, status,
fun(_, _) ->
[[{<<"Node Name">>, Node},
{<<"Raft State">>, Member},
{<<"Last Log Index">>, LogIndex},
{<<"Last Written">>, LogIndex},
{<<"Last Applied">>, LogIndex},
{<<"Commit Index">>, CommitIndex},
{<<"Snapshot Index">>, SnapshotIdx},
{<<"Term">>, Term},
{<<"Machine Version">>, MachineVersion}]
|| {Node, Member, LogIndex, CommitIndex, SnapshotIdx} <- Statuses]
end),

ct:pal("quorum status: ~tp", [rabbit_quorum_queue:status(VHost, Q)]),

ExpectedHighestLogIdx =
[[{<<"Node Name">>, Node4},
{<<"Raft State">>, follower},
{<<"Last Log Index">>, 1016},
{<<"Last Written">>,1016},
{<<"Last Applied">>,1016},
{<<"Commit Index">>, 1009},
{<<"Snapshot Index">>, undefined},
{<<"Term">>, Term},
{<<"Machine Version">>, MachineVersion}]],

[?assertEqual(ExpectedHighestLogIdx,
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [log, log_index]],

ExpectedHighestCommitIdx =
[[{<<"Node Name">>, Node3},
{<<"Raft State">>, follower},
{<<"Last Log Index">>, 1013},
{<<"Last Written">>,1013},
{<<"Last Applied">>,1013},
{<<"Commit Index">>, 1013},
{<<"Snapshot Index">>, 1009},
{<<"Term">>, Term},
{<<"Machine Version">>, MachineVersion}]],

[?assertEqual(ExpectedHighestCommitIdx,
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [commit, commit_index]],

ExpectedHighestSnapshotIdx =
[[{<<"Node Name">>, Node1},
{<<"Raft State">>, leader},
{<<"Last Log Index">>, 1015},
{<<"Last Written">>,1015},
{<<"Last Applied">>,1015},
{<<"Commit Index">>, 1010},
{<<"Snapshot Index">>, 1010},
{<<"Term">>, Term},
{<<"Machine Version">>, MachineVersion}]],
% Duplicate:
% [{<<"Node Name">>, Node2},
% {<<"Raft State">>, follower},
% {<<"Last Log Index">>, 1015},
% {<<"Last Written">>,1015},
% {<<"Last Applied">>,1015},
% {<<"Commit Index">>, 1010},
% {<<"Snapshot Index">>, 1010},
% {<<"Term">>, Term},
% {<<"Machine Version">>, MachineVersion}],

[?assertEqual(ExpectedHighestSnapshotIdx,
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [snapshot, snapshot_index]],

ExpectedHighestIdxForAll =
[[{<<"Node Name">>, Node5},
{<<"Raft State">>, noproc},
{<<"Last Log Index">>, 1050},
{<<"Last Written">>,1050},
{<<"Last Applied">>,1050},
{<<"Commit Index">>, 1050},
{<<"Snapshot Index">>, 1050},
{<<"Term">>, Term},
{<<"Machine Version">>, MachineVersion}]],

[?assertEqual(ExpectedHighestIdxForAll,
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I, true))
|| I <- [log, log_index, commit, commit_index, snapshot, snapshot_index]],

ok.

leader_locator_client_local(Config) ->
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term β€œBroadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do
alias RabbitMQ.CLI.Core.DocGuide
import RabbitMQ.CLI.Core.DataCoercion

@behaviour RabbitMQ.CLI.CommandBehaviour

use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def switches(), do: [offline_members: :boolean, index: :string, timeout: :integer]
def aliases(), do: [o: :offline_members, i: :index, t: :timeout]

def merge_defaults(args, opts) do
{args, Map.merge(%{vhost: "/", index: "commit", offline_members: false}, opts)}
end

def run([name] = _args, %{vhost: vhost, index: index, node: node_name, offline_members: offline_members}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :get_member_with_highest_index, [
vhost,
name,
to_atom(String.downcase(index)),
offline_members
]) do
{:error, :classic_queue_not_supported} ->
index = format_index(String.downcase(index))
{:error, "Cannot get #{index} index from a classic queue"}

{:error, :not_found} ->
{:error, {:not_found, :queue, vhost, name}}

other ->
other
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable

def usage, do: "member_with_highest_index <queue> [--vhost <vhost>] [--offline-members] [--index <commit|commit_index|log|log_index|snapshot|snapshot_index>]"

def usage_additional do
[
["<queue>", "quorum queue name"],
["--offline-members", "include members which are down (in noproc state)"],
["--index <commit|commit_index|log|log_index|snapshot|snapshot_index>", "name of the index to use to lookup highest member"]
]
end

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def help_section, do: :replication

def description, do: "Look up first member of a quorum queue with the highest commit, log or snapshot index."

def banner([name], %{node: node, index: index, vhost: vhost}) do
index = format_index(String.downcase(index))
"Member with highest #{index} index for queue #{name} in vhost #{vhost} on node #{node}..."
end

defp format_index("log_index"), do: "log"
defp format_index("commit_index"), do: "commit"
defp format_index("snapshot_index"), do: "snapshot"
defp format_index(index_name), do: index_name
end
55 changes: 55 additions & 0 deletions deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term β€œBroadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommandTest do
use ExUnit.Case, async: false
import TestHelper

@command RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand

setup_all do
RabbitMQ.CLI.Core.Distribution.start()

:ok
end

setup context do
{:ok,
opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000
}}
end

test "validate: when no arguments are provided, returns a failure" do
assert @command.validate([], %{}) == {:validation_failure, :not_enough_args}
end

test "validate: when two or more arguments are provided, returns a failure" do
assert @command.validate(["quorum-queue-a", "one-extra-arg"], %{}) ==
{:validation_failure, :too_many_args}

assert @command.validate(
["quorum-queue-a", "extra-arg", "another-extra-arg"],
%{}
) == {:validation_failure, :too_many_args}
end

test "validate: treats one positional arguments and default switches as a success" do
assert @command.validate(["quorum-queue-a"], %{}) == :ok
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc" do
assert match?(
{:badrpc, _},
@command.run(
["quorum-queue-a"],
%{node: :jake@thedog, vhost: "/", index: "log", offline_members: true, timeout: 200}
)
)
end
end
Loading