Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ prep_stop(State) ->

stop(State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
ok = case rabbit_db_cluster:is_clustered() of
true -> ok;
false -> rabbit_table:clear_ram_only_tables()
end,
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ retry_wait(Q, F, E, RetriesLeft) ->
{stopped, false} ->
E({absent, Q, stopped});
_ ->
case rabbit_mnesia:is_process_alive(QPid) of
case rabbit_process:is_process_alive(QPid) of
true ->
% rabbitmq-server#1682
% The old check would have crashed here,
Expand Down Expand Up @@ -1814,7 +1814,7 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
Pid = amqqueue:get_pid(Q),
not rabbit_mnesia:is_process_alive(Pid).
not rabbit_process:is_process_alive(Pid).

-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean().
has_synchronised_mirrors_online(Q) ->
Expand Down Expand Up @@ -1881,7 +1881,7 @@ on_node_down(Node) ->
filter_transient_queues_to_delete(Node) ->
fun(Q) ->
amqqueue:qnode(Q) == Node andalso
not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))
not rabbit_process:is_process_alive(amqqueue:get_pid(Q))
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
andalso (not rabbit_amqqueue:is_replicated(Q)
orelse rabbit_amqqueue:is_dead_exclusive(Q))
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ is_recoverable(Q) when ?is_amqqueue(Q) ->
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
(not rabbit_db_queue:consistent_exists(amqqueue:get_name(Q))
orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))).
orelse not rabbit_process:is_process_alive(amqqueue:get_pid(Q))).

recover(VHost, Queues) ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
Expand Down Expand Up @@ -438,11 +438,11 @@ wait_for_promoted_or_stopped(Q0) ->
{ok, Q} ->
QPid = amqqueue:get_pid(Q),
SPids = amqqueue:get_slave_pids(Q),
case rabbit_mnesia:is_process_alive(QPid) of
case rabbit_process:is_process_alive(QPid) of
true -> {promoted, Q};
false ->
case lists:any(fun(Pid) ->
rabbit_mnesia:is_process_alive(Pid)
rabbit_process:is_process_alive(Pid)
end, SPids) of
%% There is a live slave. May be promoted
true ->
Expand Down
49 changes: 47 additions & 2 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
-include_lib("rabbit_common/include/logging.hrl").

-export([init/0,
reset/0,
force_reset/0,
force_load_on_next_boot/0,
is_virgin_node/0, is_virgin_node/1,
dir/0,
ensure_dir_exists/0]).
Expand Down Expand Up @@ -40,7 +43,7 @@ init() ->
"DB: this node is virgin: ~ts", [IsVirgin],
#{domain => ?RMQLOG_DOMAIN_DB}),
ensure_dir_exists(),
case init_mnesia() of
case init_using_mnesia() of
ok ->
?LOG_DEBUG(
"DB: initialization successeful",
Expand All @@ -53,14 +56,56 @@ init() ->
Error
end.

init_mnesia() ->
init_using_mnesia() ->
?LOG_DEBUG(
"DB: initialize Mnesia",
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = rabbit_mnesia:init(),
?assertEqual(rabbit:data_dir(), mnesia_dir()),
rabbit_sup:start_child(mnesia_sync).

-spec reset() -> Ret when
Ret :: ok.
%% @doc Resets the database and the node.

reset() ->
reset_using_mnesia().

reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:reset().

-spec force_reset() -> Ret when
Ret :: ok.
%% @doc Resets the database and the node.

force_reset() ->
force_reset_using_mnesia().

force_reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node forcefully",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:force_reset().

-spec force_load_on_next_boot() -> Ret when
Ret :: ok.
%% @doc Requests that the database to be forcefully loaded during next boot.
%%
%% This is necessary when a node refuses to boot when the cluster is in a bad
%% state, like if critical members are MIA.

force_load_on_next_boot() ->
force_load_on_next_boot_using_mnesia().

force_load_on_next_boot_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node forcefully",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:force_load_next_boot().

-spec is_virgin_node() -> IsVirgin when
IsVirgin :: boolean().
%% @doc Indicates if this RabbitMQ node is virgin.
Expand Down
150 changes: 150 additions & 0 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_db_cluster).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include_lib("rabbit_common/include/logging.hrl").

-export([join/2,
forget_member/2]).
-export([change_node_type/1]).
-export([is_clustered/0,
members/0,
disc_members/0,
node_type/0,
check_consistency/0,
cli_cluster_status/0]).

-type node_type() :: disc_node_type() | ram_node_type().
-type disc_node_type() :: disc.
-type ram_node_type() :: ram.

-export_type([node_type/0, disc_node_type/0, ram_node_type/0]).

-define(
IS_NODE_TYPE(NodeType),
((NodeType) =:= disc orelse (NodeType) =:= ram)).

%% -------------------------------------------------------------------
%% Cluster formation.
%% -------------------------------------------------------------------

-spec join(RemoteNode, NodeType) -> Ret when
RemoteNode :: node(),
NodeType :: rabbit_db_cluster:node_type(),
Ret :: Ok | Error,
Ok :: ok | {ok, already_member},
Error :: {error, {inconsistent_cluster, string()}}.
%% @doc Adds this node to a cluster using `RemoteNode' to reach it.

join(RemoteNode, NodeType)
when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) ->
?LOG_DEBUG(
"DB: joining cluster using remote node `~ts`", [RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
join_using_mnesia(RemoteNode, NodeType).

join_using_mnesia(RemoteNode, NodeType) ->
rabbit_mnesia:join_cluster(RemoteNode, NodeType).

-spec forget_member(Node, RemoveWhenOffline) -> ok when
Node :: node(),
RemoveWhenOffline :: boolean().
%% @doc Removes `Node' from the cluster.

forget_member(Node, RemoveWhenOffline) ->
forget_member_using_mnesia(Node, RemoveWhenOffline).

forget_member_using_mnesia(Node, RemoveWhenOffline) ->
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).

%% -------------------------------------------------------------------
%% Cluster update.
%% -------------------------------------------------------------------

-spec change_node_type(NodeType) -> ok when
NodeType :: rabbit_db_cluster:node_type().
%% @doc Changes the node type to `NodeType'.
%%
%% Node types may not all be valid with all databases.

change_node_type(NodeType) ->
change_node_type_using_mnesia(NodeType).

change_node_type_using_mnesia(NodeType) ->
rabbit_mnesia:change_cluster_node_type(NodeType).

%% -------------------------------------------------------------------
%% Cluster status.
%% -------------------------------------------------------------------

-spec is_clustered() -> IsClustered when
IsClustered :: boolean().
%% @doc Indicates if this node is clustered with other nodes or not.

is_clustered() ->
is_clustered_using_mnesia().

is_clustered_using_mnesia() ->
rabbit_mnesia:is_clustered().

-spec members() -> Members when
Members :: [node()].
%% @doc Returns the list of cluster members.

members() ->
members_using_mnesia().

members_using_mnesia() ->
mnesia:system_info(db_nodes).

-spec disc_members() -> Members when
Members :: [node()].
%% @private

disc_members() ->
disc_members_using_mnesia().

disc_members_using_mnesia() ->
rabbit_mnesia:cluster_nodes(disc).

-spec node_type() -> NodeType when
NodeType :: rabbit_db_cluster:node_type().
%% @doc Returns the type of this node, `disc' or `ram'.
%%
%% Node types may not all be relevant with all databases.

node_type() ->
node_type_using_mnesia().

node_type_using_mnesia() ->
rabbit_mnesia:node_type().

-spec check_consistency() -> ok.
%% @doc Ensures the cluster is consistent.

check_consistency() ->
check_consistency_using_mnesia().

check_consistency_using_mnesia() ->
rabbit_mnesia:check_cluster_consistency().

-spec cli_cluster_status() -> ClusterStatus when
ClusterStatus :: [{nodes, [{rabbit_db_cluster:node_type(), [node()]}]} |
{running_nodes, [node()]} |
{partitions, [{node(), [node()]}]}].
%% @doc Returns information from the cluster for the `cluster_status' CLI
%% command.

cli_cluster_status() ->
cli_cluster_status_using_mnesia().

cli_cluster_status_using_mnesia() ->
rabbit_mnesia:status().
24 changes: 15 additions & 9 deletions deps/rabbit/src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,22 @@
%% Used internally in rpc calls
-export([node_info/0, remove_node_if_mnesia_running/1]).

-deprecated({on_running_node, 1,
"Use rabbit_process:on_running_node/1 instead"}).
-deprecated({is_process_alive, 1,
"Use rabbit_process:is_process_alive/1 instead"}).
-deprecated({is_registered_process_alive, 1,
"Use rabbit_process:is_registered_process_alive/1 instead"}).

-ifdef(TEST).
-compile(export_all).
-export([init_with_lock/3]).
-endif.

%%----------------------------------------------------------------------------

-export_type([node_type/0, cluster_status/0]).
-export_type([cluster_status/0]).

-type node_type() :: disc | ram.
-type cluster_status() :: {[node()], [node()], [node()]}.

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -130,12 +136,12 @@ init_with_lock(Retries, Timeout, RunPeerDiscovery) ->
init_with_lock(Retries - 1, Timeout, RunPeerDiscovery)
end.

-spec run_peer_discovery() -> ok | {[node()], node_type()}.
-spec run_peer_discovery() -> ok | {[node()], rabbit_db_cluster:node_type()}.
run_peer_discovery() ->
{RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(),
run_peer_discovery_with_retries(RetriesLeft, DelayInterval).

-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], node_type()}.
-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], rabbit_db_cluster:node_type()}.
run_peer_discovery_with_retries(0, _DelayInterval) ->
ok;
run_peer_discovery_with_retries(RetriesLeft, DelayInterval) ->
Expand Down Expand Up @@ -221,7 +227,7 @@ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterva
%% all in the same cluster, we simply pick the first online node and
%% we cluster to its cluster.

-spec join_cluster(node(), node_type())
-spec join_cluster(node(), rabbit_db_cluster:node_type())
-> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.

join_cluster(DiscoveryNode, NodeType) ->
Expand Down Expand Up @@ -310,7 +316,7 @@ wipe() ->
ok = rabbit_node_monitor:reset_cluster_status(),
ok.

-spec change_cluster_node_type(node_type()) -> 'ok'.
-spec change_cluster_node_type(rabbit_db_cluster:node_type()) -> 'ok'.

change_cluster_node_type(Type) ->
ensure_mnesia_not_running(),
Expand Down Expand Up @@ -414,7 +420,7 @@ remove_node_offline_node(Node) ->
%% Queries
%%----------------------------------------------------------------------------

-spec status() -> [{'nodes', [{node_type(), [node()]}]} |
-spec status() -> [{'nodes', [{rabbit_db_cluster:node_type(), [node()]}]} |
{'running_nodes', [node()]} |
{'partitions', [{node(), [node()]}]}].

Expand Down Expand Up @@ -532,7 +538,7 @@ node_info() ->
mnesia:system_info(protocol_version),
cluster_status_from_mnesia()}.

-spec node_type() -> node_type().
-spec node_type() -> rabbit_db_cluster:node_type().

node_type() ->
{_AllNodes, DiscNodes, _RunningNodes} =
Expand Down Expand Up @@ -600,7 +606,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
rabbit_node_monitor:update_cluster_status(),
ok.

-spec init_db_unchecked([node()], node_type()) -> 'ok'.
-spec init_db_unchecked([node()], rabbit_db_cluster:node_type()) -> 'ok'.

init_db_unchecked(ClusterNodes, NodeType) ->
init_db(ClusterNodes, NodeType, false).
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ notify_joined_cluster() ->
NewMember = node(),
Nodes = rabbit_nodes:list_running() -- [NewMember],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_mnesia:node_type()}),
{joined_cluster, node(), rabbit_db_cluster:node_type()}),

ok.

Expand Down Expand Up @@ -415,9 +415,9 @@ handle_call(_Request, _From, State) ->
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
Nodes = rabbit_nodes:list_running() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{node_up, node(), rabbit_mnesia:node_type(), GUID}),
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
%% register other active rabbits with this rabbit
DiskNodes = rabbit_mnesia:cluster_nodes(disc),
DiskNodes = rabbit_db_cluster:disc_members(),
[gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
true -> disc;
false -> ram
Expand Down
Loading