Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ clear_init_finished() ->
%% @doc Resets the database and the node.

reset() ->
ok = rabbit:stop(),
ok = case rabbit_khepri:is_enabled() of
true -> reset_using_khepri();
false -> reset_using_mnesia()
Expand Down Expand Up @@ -196,6 +197,7 @@ force_load_on_next_boot_using_mnesia() ->
rabbit_mnesia:force_load_next_boot().

post_reset() ->
wipe_data_dir(),
rabbit_feature_flags:reset(),

%% The cluster status files that RabbitMQ uses when Mnesia is the database
Expand All @@ -210,6 +212,17 @@ post_reset() ->

ok.

wipe_data_dir() ->
DataDir = dir(),
Glob = filename:join(DataDir, "*"),
FilesToRemove = filelib:wildcard(Glob),
?LOG_DEBUG(
"DB: wipe files in data directory `~ts`:~p",
[DataDir, FilesToRemove],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = rabbit_file:recursive_delete(FilesToRemove),
ok.

%% -------------------------------------------------------------------
%% is_virgin_node().
%% -------------------------------------------------------------------
Expand Down
290 changes: 275 additions & 15 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,108 @@
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% TODO
%%
%% Opérations sur le cluster :
%%
%% En commun :
%% 1. On stoppe "rabbit" sur le nœud à ajouter/retirer
%% 2. On lock le registre des feature flags
%%
%% Join :
%% * reset du nœud (cf ci-dessous)
%% * copie des feature flags (+ reset registre)
%% * Mnesia :
%% * init avec checks
%% Khepri :
%% * setup
%% * préparation
%% * join
%% * reset registre feature flags (utile ?)
%% * notify_joined_cluster
%%
%% Reset/forget member, depuis un autre nœud que celui retiré :
%% * liste des membres
%% * Mnesia :
%% * online :
%% * offline :
%% Khepri :
%% * online :
%% * offline :
%% * amqqueue:forget_all()
%% * quorum queue shrink_all
%% * stream queue delete_all_replicas
%% * stream coordinator forget_node
%% * notify_left_cluster
%% * suppression des fichiers
%% * reset registre feature flags
%% * cluster status
%% * déconnexion du nœud (?)
%%
%% En commun :
%% 1. On unlock le registre des feature flags
%% 2. On redémarre "rabbit"
%%
%% À exécuter depuis quel nœud ?
%% [ ] depuis le nœud qu’ajoute/retire
%% [x] depuis le cluster qui est modifié
%% (doutes : risqué si le nœud à ajouter fait partie d’un autre cluster avant
%% son reset)
%%
%% JOIN (depuis le nœud qu’on ajoute)
%%
%% * est-ce que le nœud fait partie du cluster cible ?
%% * sinon :
%%
%% * stoppe rabbit
%%
%% * reset (cf ci-dessous)
%%
%% * lock registre feature flags
%% * can join ?
%% * copy feature flags + reset registre
%% * (Mnesia/Khepri specific)
%% * notify_joined_cluster
%% * unlock registre feature flags
%%
%% * start rabbit
%%
%% RESET
%%
%% * stoppe rabbit
%%
%% * lock registre feature flags
%% * en cluster ? oui -> forget member locked (cf ci-dessous)
%% * suppression des fichiers
%% * reset registre feature flags
%% * cluster status
%% * unlock registre feature flags
%% * déco des anciens collègues de cluster
%%
%% * start rabbit
%%
%% FORGET MEMBER
%%
%% * stoppe rabbit sur nœud distant
%% * lock registre feature flags
%%
%% * (Mnesia/Khepri specific)
%% * amqqueue:forget_all()
%% * quorum queue shrink_all
%% * stream queue delete_all_replicas
%% * stream coordinator forget_node
%% * notify_left_cluster
%%
%% * unlock registre feature flags
%% * start rabbit sur nœud distant
%% * déco de l’ancien membre
%%
%% (on ne redémarre pas rabbit ; est-ce qu’on arrête la VM ?)

-module(rabbit_db_cluster).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include_lib("rabbit_common/include/logging.hrl").

Expand Down Expand Up @@ -141,6 +240,13 @@ join(RemoteNode, NodeType)
ok = rabbit_db:reset(),
rabbit_feature_flags:copy_feature_states_after_reset(
RemoteNode)
catch
Class:Reason:Stacktrace ->
?LOG_ERROR(
"DB: failed to reset node before adding it to a "
"cluster: ~p:~p:~p", [Class, Reason, Stacktrace],
#{domain => ?RMQLOG_DOMAIN_DB}),
erlang:raise(Class, Reason, Stacktrace)
after
rabbit_ff_registry_factory:release_state_change_lock()
end,
Expand Down Expand Up @@ -253,28 +359,71 @@ join_using_khepri(_ClusterNodes, ram = NodeType) ->
RemoveWhenOffline :: boolean().
%% @doc Removes `Node' from the cluster.

forget_member(Node, RemoveWhenOffline) ->
case forget_member0(Node, RemoveWhenOffline) of
ok ->
rabbit_node_monitor:notify_left_cluster(Node);
Error ->
Error
end.

forget_member0(Node, RemoveWhenOffline) ->
forget_member(Node, RemoveWhenOffline) when is_atom(Node) ->
case rabbit:is_running(Node) of
false ->
?LOG_DEBUG(
"DB: removing cluster member `~ts`", [Node],
#{domain => ?RMQLOG_DOMAIN_DB}),
case rabbit_khepri:is_enabled() of
true -> forget_member_using_khepri(Node, RemoveWhenOffline);
false -> forget_member_using_mnesia(Node, RemoveWhenOffline)
{ok, InitialState} = lock_cluster_changes(Node),
try
forget_member_locked(Node, RemoveWhenOffline)
after
unlock_cluster_changes(InitialState)
end;
true ->
{error, {failed_to_remove_node, Node, rabbit_still_running}}
end.

forget_member_locked(Node, RemoveWhenOffline)
when is_atom(Node) andalso Node =/= node() ->
?LOG_DEBUG(
"DB: removing cluster member `~ts`", [Node],
#{domain => ?RMQLOG_DOMAIN_DB}),
?assertNot(rabbit:is_running(Node)),
Ret = case rabbit_khepri:is_enabled() of
true -> forget_member_using_khepri(Node, RemoveWhenOffline);
false -> forget_member_using_mnesia(Node, RemoveWhenOffline)
end,
case Ret of
ok -> post_forget_member_locked(Node, RemoveWhenOffline);
_ -> ok
end,
Ret;
forget_member_locked(Node, RemoveWhenOffline)
when is_atom(Node) andalso Node =:= node() ->
OtherNodes = members() -- [Node],
forget_member_locked_remotely(OtherNodes, Node, RemoveWhenOffline).

forget_member_locked_remotely([OtherNode | Rest], Node, RemoveWhenOffline) ->
try
?LOG_DEBUG(
"DB: removing cluster member `~ts` (this node); doing it from "
"remote node `~s`",
[Node, OtherNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
Ret = erpc:call(
OtherNode,
?MODULE, forget_member_locked, [Node, RemoveWhenOffline]),
case Ret of
ok ->
ok;
Error ->
?LOG_DEBUG(
"DB: failed to remove cluster member `~ts` from node "
"`~s`: ~0p",
[Node, OtherNode, Error],
#{domain => ?RMQLOG_DOMAIN_DB}),
forget_member_locked_remotely(Rest, Node, RemoveWhenOffline)
end
catch
_:Reason ->
?LOG_DEBUG(
"DB: failed to remove cluster member `~ts` from node `~s`: ~0p",
[Node, OtherNode, Reason],
#{domain => ?RMQLOG_DOMAIN_DB}),
forget_member_locked_remotely(Rest, Node, RemoveWhenOffline)
end;
forget_member_locked_remotely([], _Node, _RemoveWhenOffline) ->
ok.

forget_member_using_mnesia(Node, RemoveWhenOffline) ->
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).

Expand All @@ -287,6 +436,117 @@ forget_member_using_khepri(_Node, true) ->
forget_member_using_khepri(Node, false = _RemoveWhenOffline) ->
rabbit_khepri:remove_member(Node).

post_forget_member_locked(Node, false = _RemoveWhenOffline) ->
?LOG_DEBUG(
"DB: removing node `~s` from various Ra clusters", [Node],
#{domain => ?RMQLOG_DOMAIN_DB}),
_ = rabbit_amqqueue:forget_all(Node),
_ = rabbit_quorum_queue:shrink_all(Node),
_ = rabbit_stream_queue:delete_all_replicas(Node),
_ = rabbit_stream_coordinator:forget_node(Node),
rabbit_node_monitor:notify_left_cluster(Node),
ok;
post_forget_member_locked(_Node, true = _RemoveWhenOffline) ->
ok.

lock_cluster_changes(ChangingNode) ->
RabbitWasRunning = stop_rabbit_if_running(ChangingNode),
InitialState = #{changing_node => ChangingNode,
rabbit_was_running => RabbitWasRunning},

%% We acquire the feature flags registry reload lock because between
%% the time we reset the registry (as part of `rabbit_db:reset/0' and
%% the states copy from the remote node, there could be a concurrent
%% reload of the registry (for instance because of peer discovery on
%% another node) with the default/empty states.
%%
%% To make this work, the lock is also acquired from
%% `rabbit_ff_registry_wrapper'.
?LOG_DEBUG(
"DB: lock feature flags registry to avoid concurrent changes to the "
"cluster from a feature flag callback",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_ff_registry_factory:acquire_state_change_lock(),
{ok, InitialState}.

stop_rabbit_if_running(ThisNode) when ThisNode =:= node() ->
RabbitWasRunning = rabbit:is_running(),
case RabbitWasRunning of
true ->
?LOG_DEBUG(
"DB: stop \"rabbit\" on this node (~ts) before making changes "
"to the cluster",
[ThisNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = rabbit:stop();
false ->
?LOG_DEBUG(
"DB: \"rabbit\" already stopped on this node (~ts), ready for "
"changes to the cluster",
[ThisNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok
end,
RabbitWasRunning;
stop_rabbit_if_running(RemoteNode) when is_atom(RemoteNode) ->
try
RabbitWasRunning = erpc:call(RemoteNode, rabbit, is_running, []),
case RabbitWasRunning of
true ->
?LOG_DEBUG(
"DB: stop \"rabbit\" on node `~ts` before making changes "
"to the cluster",
[RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = erpc:call(RemoteNode, rabbit, stop, []);
false ->
?LOG_DEBUG(
"DB: \"rabbit\" already stopped on node `~ts`, ready for "
"changes to the cluster",
[RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok
end,
RabbitWasRunning
catch
error:{erpc, noconnection} ->
?LOG_DEBUG(
"DB: node `~ts` unreachable, considering that \"rabbit\" is "
"stopped on it, ready for changes to the cluster",
[RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
false
end.

unlock_cluster_changes(
#{changing_node := ChangingNode,
rabbit_was_running := RabbitWasRunning}) ->
rabbit_ff_registry_factory:release_state_change_lock(),
start_rabbit_if_was_running(ChangingNode, RabbitWasRunning),
ok.

start_rabbit_if_was_running(ChangingNode, false = _RabbitWasRunning) ->
?LOG_DEBUG(
"DB: leaving \"rabbit\" stopped on node `~ts` after changes to the "
"cluster",
[ChangingNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok;
start_rabbit_if_was_running(ThisNode, true = _RabbitWasRunning)
when ThisNode =:= node() ->
?LOG_DEBUG(
"DB: restart \"rabbit\" on this node (~ts) after changes to the "
"cluster",
[ThisNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit:start();
start_rabbit_if_was_running(RemoteNode, true = _RabbitWasRunning) ->
?LOG_DEBUG(
"DB: restart \"rabbit\" on node `~ts` after changes to the cluster",
[RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
erpc:call(RemoteNode, rabbit, start, []).

%% -------------------------------------------------------------------
%% Cluster update.
%% -------------------------------------------------------------------
Expand Down
22 changes: 10 additions & 12 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ ensure_ra_system_started() ->
{ok, _} = application:ensure_all_started(khepri),
ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM).

ensure_ra_system_stopped() ->
ok = rabbit_ra_systems:ensure_ra_system_stopped(?RA_SYSTEM).

retry_timeout() ->
case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
{ok, T} when is_integer(T) andalso T >= 0 -> T;
Expand Down Expand Up @@ -372,19 +375,14 @@ await_replication() ->
%% @private

reset() ->
case rabbit:is_running() of
false ->
%% Rabbit should be stopped, but Khepri needs to be running.
%% Restart it.
ok = setup(),
ok = khepri_cluster:reset(?RA_CLUSTER_NAME),
ok = khepri:stop(?RA_CLUSTER_NAME),
?assertNot(rabbit:is_running()),

_ = file:delete(rabbit_guid:filename()),
ok;
true ->
throw({error, rabbitmq_unexpectedly_running})
end.
ok = setup(),
ThisNode = node(),
rabbit_db_cluster:forget_member(ThisNode, false),
ok = khepri:stop(?RA_CLUSTER_NAME),
ok = ensure_ra_system_stopped(),
ok.

-spec dir() -> Dir when
Dir :: file:filename_all().
Expand Down
Loading