diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index a09a726f6762..236e084e3d1b 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1026,7 +1026,7 @@ prep_stop(State) -> stop(State) -> ok = rabbit_alarm:stop(), - ok = case rabbit_mnesia:is_clustered() of + ok = case rabbit_db_cluster:is_clustered() of true -> ok; false -> rabbit_table:clear_ram_only_tables() end, diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a6ef8adcd97c..2fff0b788795 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -574,7 +574,7 @@ retry_wait(Q, F, E, RetriesLeft) -> {stopped, false} -> E({absent, Q, stopped}); _ -> - case rabbit_mnesia:is_process_alive(QPid) of + case rabbit_process:is_process_alive(QPid) of true -> % rabbitmq-server#1682 % The old check would have crashed here, @@ -1814,7 +1814,7 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> false; is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> Pid = amqqueue:get_pid(Q), - not rabbit_mnesia:is_process_alive(Pid). + not rabbit_process:is_process_alive(Pid). -spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean(). has_synchronised_mirrors_online(Q) -> @@ -1881,7 +1881,7 @@ on_node_down(Node) -> filter_transient_queues_to_delete(Node) -> fun(Q) -> amqqueue:qnode(Q) == Node andalso - not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) + not rabbit_process:is_process_alive(amqqueue:get_pid(Q)) andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q)) andalso (not rabbit_amqqueue:is_replicated(Q) orelse rabbit_amqqueue:is_dead_exclusive(Q)) diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 4ba3244dc880..c9b0c36408a7 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -129,7 +129,7 @@ is_recoverable(Q) when ?is_amqqueue(Q) -> %% the policy). Thus, we must check if the local pid is alive %% - if the record is present - in order to restart. (not rabbit_db_queue:consistent_exists(amqqueue:get_name(Q)) - orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))). + orelse not rabbit_process:is_process_alive(amqqueue:get_pid(Q))). recover(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -438,11 +438,11 @@ wait_for_promoted_or_stopped(Q0) -> {ok, Q} -> QPid = amqqueue:get_pid(Q), SPids = amqqueue:get_slave_pids(Q), - case rabbit_mnesia:is_process_alive(QPid) of + case rabbit_process:is_process_alive(QPid) of true -> {promoted, Q}; false -> case lists:any(fun(Pid) -> - rabbit_mnesia:is_process_alive(Pid) + rabbit_process:is_process_alive(Pid) end, SPids) of %% There is a live slave. May be promoted true -> diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index 0e60e2033e3e..e15b78df3caf 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -13,6 +13,9 @@ -include_lib("rabbit_common/include/logging.hrl"). -export([init/0, + reset/0, + force_reset/0, + force_load_on_next_boot/0, is_virgin_node/0, is_virgin_node/1, dir/0, ensure_dir_exists/0]). @@ -40,7 +43,7 @@ init() -> "DB: this node is virgin: ~ts", [IsVirgin], #{domain => ?RMQLOG_DOMAIN_DB}), ensure_dir_exists(), - case init_mnesia() of + case init_using_mnesia() of ok -> ?LOG_DEBUG( "DB: initialization successeful", @@ -53,7 +56,7 @@ init() -> Error end. -init_mnesia() -> +init_using_mnesia() -> ?LOG_DEBUG( "DB: initialize Mnesia", #{domain => ?RMQLOG_DOMAIN_DB}), @@ -61,6 +64,48 @@ init_mnesia() -> ?assertEqual(rabbit:data_dir(), mnesia_dir()), rabbit_sup:start_child(mnesia_sync). +-spec reset() -> Ret when + Ret :: ok. +%% @doc Resets the database and the node. + +reset() -> + reset_using_mnesia(). + +reset_using_mnesia() -> + ?LOG_DEBUG( + "DB: resetting node", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_mnesia:reset(). + +-spec force_reset() -> Ret when + Ret :: ok. +%% @doc Resets the database and the node. + +force_reset() -> + force_reset_using_mnesia(). + +force_reset_using_mnesia() -> + ?LOG_DEBUG( + "DB: resetting node forcefully", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_mnesia:force_reset(). + +-spec force_load_on_next_boot() -> Ret when + Ret :: ok. +%% @doc Requests that the database to be forcefully loaded during next boot. +%% +%% This is necessary when a node refuses to boot when the cluster is in a bad +%% state, like if critical members are MIA. + +force_load_on_next_boot() -> + force_load_on_next_boot_using_mnesia(). + +force_load_on_next_boot_using_mnesia() -> + ?LOG_DEBUG( + "DB: resetting node forcefully", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_mnesia:force_load_next_boot(). + -spec is_virgin_node() -> IsVirgin when IsVirgin :: boolean(). %% @doc Indicates if this RabbitMQ node is virgin. diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl new file mode 100644 index 000000000000..aa89ac0b7813 --- /dev/null +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -0,0 +1,150 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_db_cluster). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-export([join/2, + forget_member/2]). +-export([change_node_type/1]). +-export([is_clustered/0, + members/0, + disc_members/0, + node_type/0, + check_consistency/0, + cli_cluster_status/0]). + +-type node_type() :: disc_node_type() | ram_node_type(). +-type disc_node_type() :: disc. +-type ram_node_type() :: ram. + +-export_type([node_type/0, disc_node_type/0, ram_node_type/0]). + +-define( + IS_NODE_TYPE(NodeType), + ((NodeType) =:= disc orelse (NodeType) =:= ram)). + +%% ------------------------------------------------------------------- +%% Cluster formation. +%% ------------------------------------------------------------------- + +-spec join(RemoteNode, NodeType) -> Ret when + RemoteNode :: node(), + NodeType :: rabbit_db_cluster:node_type(), + Ret :: Ok | Error, + Ok :: ok | {ok, already_member}, + Error :: {error, {inconsistent_cluster, string()}}. +%% @doc Adds this node to a cluster using `RemoteNode' to reach it. + +join(RemoteNode, NodeType) + when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) -> + ?LOG_DEBUG( + "DB: joining cluster using remote node `~ts`", [RemoteNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + join_using_mnesia(RemoteNode, NodeType). + +join_using_mnesia(RemoteNode, NodeType) -> + rabbit_mnesia:join_cluster(RemoteNode, NodeType). + +-spec forget_member(Node, RemoveWhenOffline) -> ok when + Node :: node(), + RemoveWhenOffline :: boolean(). +%% @doc Removes `Node' from the cluster. + +forget_member(Node, RemoveWhenOffline) -> + forget_member_using_mnesia(Node, RemoveWhenOffline). + +forget_member_using_mnesia(Node, RemoveWhenOffline) -> + rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline). + +%% ------------------------------------------------------------------- +%% Cluster update. +%% ------------------------------------------------------------------- + +-spec change_node_type(NodeType) -> ok when + NodeType :: rabbit_db_cluster:node_type(). +%% @doc Changes the node type to `NodeType'. +%% +%% Node types may not all be valid with all databases. + +change_node_type(NodeType) -> + change_node_type_using_mnesia(NodeType). + +change_node_type_using_mnesia(NodeType) -> + rabbit_mnesia:change_cluster_node_type(NodeType). + +%% ------------------------------------------------------------------- +%% Cluster status. +%% ------------------------------------------------------------------- + +-spec is_clustered() -> IsClustered when + IsClustered :: boolean(). +%% @doc Indicates if this node is clustered with other nodes or not. + +is_clustered() -> + is_clustered_using_mnesia(). + +is_clustered_using_mnesia() -> + rabbit_mnesia:is_clustered(). + +-spec members() -> Members when + Members :: [node()]. +%% @doc Returns the list of cluster members. + +members() -> + members_using_mnesia(). + +members_using_mnesia() -> + mnesia:system_info(db_nodes). + +-spec disc_members() -> Members when + Members :: [node()]. +%% @private + +disc_members() -> + disc_members_using_mnesia(). + +disc_members_using_mnesia() -> + rabbit_mnesia:cluster_nodes(disc). + +-spec node_type() -> NodeType when + NodeType :: rabbit_db_cluster:node_type(). +%% @doc Returns the type of this node, `disc' or `ram'. +%% +%% Node types may not all be relevant with all databases. + +node_type() -> + node_type_using_mnesia(). + +node_type_using_mnesia() -> + rabbit_mnesia:node_type(). + +-spec check_consistency() -> ok. +%% @doc Ensures the cluster is consistent. + +check_consistency() -> + check_consistency_using_mnesia(). + +check_consistency_using_mnesia() -> + rabbit_mnesia:check_cluster_consistency(). + +-spec cli_cluster_status() -> ClusterStatus when + ClusterStatus :: [{nodes, [{rabbit_db_cluster:node_type(), [node()]}]} | + {running_nodes, [node()]} | + {partitions, [{node(), [node()]}]}]. +%% @doc Returns information from the cluster for the `cluster_status' CLI +%% command. + +cli_cluster_status() -> + cli_cluster_status_using_mnesia(). + +cli_cluster_status_using_mnesia() -> + rabbit_mnesia:status(). diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index 703f7f039dc3..bc4426c31479 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -56,6 +56,13 @@ %% Used internally in rpc calls -export([node_info/0, remove_node_if_mnesia_running/1]). +-deprecated({on_running_node, 1, + "Use rabbit_process:on_running_node/1 instead"}). +-deprecated({is_process_alive, 1, + "Use rabbit_process:is_process_alive/1 instead"}). +-deprecated({is_registered_process_alive, 1, + "Use rabbit_process:is_registered_process_alive/1 instead"}). + -ifdef(TEST). -compile(export_all). -export([init_with_lock/3]). @@ -63,9 +70,8 @@ %%---------------------------------------------------------------------------- --export_type([node_type/0, cluster_status/0]). +-export_type([cluster_status/0]). --type node_type() :: disc | ram. -type cluster_status() :: {[node()], [node()], [node()]}. %%---------------------------------------------------------------------------- @@ -130,12 +136,12 @@ init_with_lock(Retries, Timeout, RunPeerDiscovery) -> init_with_lock(Retries - 1, Timeout, RunPeerDiscovery) end. --spec run_peer_discovery() -> ok | {[node()], node_type()}. +-spec run_peer_discovery() -> ok | {[node()], rabbit_db_cluster:node_type()}. run_peer_discovery() -> {RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(), run_peer_discovery_with_retries(RetriesLeft, DelayInterval). --spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], node_type()}. +-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], rabbit_db_cluster:node_type()}. run_peer_discovery_with_retries(0, _DelayInterval) -> ok; run_peer_discovery_with_retries(RetriesLeft, DelayInterval) -> @@ -221,7 +227,7 @@ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterva %% all in the same cluster, we simply pick the first online node and %% we cluster to its cluster. --spec join_cluster(node(), node_type()) +-spec join_cluster(node(), rabbit_db_cluster:node_type()) -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}. join_cluster(DiscoveryNode, NodeType) -> @@ -310,7 +316,7 @@ wipe() -> ok = rabbit_node_monitor:reset_cluster_status(), ok. --spec change_cluster_node_type(node_type()) -> 'ok'. +-spec change_cluster_node_type(rabbit_db_cluster:node_type()) -> 'ok'. change_cluster_node_type(Type) -> ensure_mnesia_not_running(), @@ -414,7 +420,7 @@ remove_node_offline_node(Node) -> %% Queries %%---------------------------------------------------------------------------- --spec status() -> [{'nodes', [{node_type(), [node()]}]} | +-spec status() -> [{'nodes', [{rabbit_db_cluster:node_type(), [node()]}]} | {'running_nodes', [node()]} | {'partitions', [{node(), [node()]}]}]. @@ -532,7 +538,7 @@ node_info() -> mnesia:system_info(protocol_version), cluster_status_from_mnesia()}. --spec node_type() -> node_type(). +-spec node_type() -> rabbit_db_cluster:node_type(). node_type() -> {_AllNodes, DiscNodes, _RunningNodes} = @@ -600,7 +606,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> rabbit_node_monitor:update_cluster_status(), ok. --spec init_db_unchecked([node()], node_type()) -> 'ok'. +-spec init_db_unchecked([node()], rabbit_db_cluster:node_type()) -> 'ok'. init_db_unchecked(ClusterNodes, NodeType) -> init_db(ClusterNodes, NodeType, false). diff --git a/deps/rabbit/src/rabbit_node_monitor.erl b/deps/rabbit/src/rabbit_node_monitor.erl index 17d63a683178..d469830c9b0c 100644 --- a/deps/rabbit/src/rabbit_node_monitor.erl +++ b/deps/rabbit/src/rabbit_node_monitor.erl @@ -169,7 +169,7 @@ notify_joined_cluster() -> NewMember = node(), Nodes = rabbit_nodes:list_running() -- [NewMember], gen_server:abcast(Nodes, ?SERVER, - {joined_cluster, node(), rabbit_mnesia:node_type()}), + {joined_cluster, node(), rabbit_db_cluster:node_type()}), ok. @@ -415,9 +415,9 @@ handle_call(_Request, _From, State) -> handle_cast(notify_node_up, State = #state{guid = GUID}) -> Nodes = rabbit_nodes:list_running() -- [node()], gen_server:abcast(Nodes, ?SERVER, - {node_up, node(), rabbit_mnesia:node_type(), GUID}), + {node_up, node(), rabbit_db_cluster:node_type(), GUID}), %% register other active rabbits with this rabbit - DiskNodes = rabbit_mnesia:cluster_nodes(disc), + DiskNodes = rabbit_db_cluster:disc_members(), [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of true -> disc; false -> ram diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index a9eb9dad7060..f2e610d3eb1f 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -180,7 +180,7 @@ is_member(Node) when is_atom(Node) -> %% @see filter_members/1. list_members() -> - mnesia:system_info(db_nodes). + rabbit_db_cluster:members(). -spec filter_members(Nodes) -> Nodes when Nodes :: [node()]. diff --git a/deps/rabbit/src/rabbit_prelaunch_cluster.erl b/deps/rabbit/src/rabbit_prelaunch_cluster.erl index c63ebb4b76fa..7effd20cc4c1 100644 --- a/deps/rabbit/src/rabbit_prelaunch_cluster.erl +++ b/deps/rabbit/src/rabbit_prelaunch_cluster.erl @@ -30,5 +30,5 @@ setup(Context) -> ?LOG_DEBUG( "Checking cluster consistency", [], #{domain => ?RMQLOG_DOMAIN_PRELAUNCH}), - rabbit_mnesia:check_cluster_consistency(), + rabbit_db_cluster:check_consistency(), ok. diff --git a/deps/rabbit/src/rabbit_prequeue.erl b/deps/rabbit/src/rabbit_prequeue.erl index 02a3be53fb3d..27aac858532c 100644 --- a/deps/rabbit/src/rabbit_prequeue.erl +++ b/deps/rabbit/src/rabbit_prequeue.erl @@ -55,9 +55,9 @@ init(Q0, restart) when ?is_amqqueue(Q0) -> QPid = amqqueue:get_pid(Q1), SPids = amqqueue:get_slave_pids(Q1), LocalOrMasterDown = node(QPid) =:= node() - orelse not rabbit_mnesia:on_running_node(QPid), - Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], - case rabbit_mnesia:is_process_alive(QPid) of + orelse not rabbit_process:on_running_node(QPid), + Slaves = [SPid || SPid <- SPids, rabbit_process:is_process_alive(SPid)], + case rabbit_process:is_process_alive(QPid) of true -> false = LocalOrMasterDown, %% assertion rabbit_mirror_queue_slave:go(self(), async), rabbit_mirror_queue_slave:init(Q1); %% [1] diff --git a/deps/rabbit/src/rabbit_process.erl b/deps/rabbit/src/rabbit_process.erl new file mode 100644 index 000000000000..29ab7456354c --- /dev/null +++ b/deps/rabbit/src/rabbit_process.erl @@ -0,0 +1,77 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_process). + +-export([on_running_node/1, + is_process_alive/1, + is_registered_process_alive/1]). + +-spec on_running_node(Pid) -> OnRunningNode when + Pid :: pid(), + OnRunningNode :: boolean(). +%% @doc Indicates if the specified process runs on a member of the cluster. +%% +%% @param Pid the PID of the process to check +%% @returns true if the process runs on one of the cluster members, false +%% otherwise. + +on_running_node(Pid) -> + Node = node(Pid), + rabbit_nodes:is_running(Node). + +%% This requires the process be in the same running cluster as us +%% (i.e. not partitioned or some random node). +%% +%% See also rabbit_misc:is_process_alive/1 which does not. + +-spec is_process_alive(Pid) -> IsAlive when + Pid :: pid() | {RegisteredName, Node}, + RegisteredName :: atom(), + Node :: node(), + IsAlive :: boolean(). +%% @doc Indicates if the specified process is alive. +%% +%% Unlike {@link erlang:is_process_alive/1}, this function works with remote +%% processes and registered processes. However, the process must run on one of +%% the cluster members. +%% +%% @param Pid the PID or name of the process to check +%% @returns true if the process is alive runs on one of the cluster members, +%% false otherwise. + +is_process_alive(Pid) when is_pid(Pid) -> + on_running_node(Pid) + andalso + rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true; +is_process_alive({Name, Node}) when is_atom(Name) andalso is_atom(Node) -> + case rabbit_nodes:is_running(Node) of + true -> + try + erpc:call(Node, ?MODULE, is_registered_process_alive, [Name]) + catch + error:{exception, undef, [{?MODULE, _, _, _} | _]} -> + rpc:call( + Node, + rabbit_mnesia, is_registered_process_alive, [Name]) + end; + false -> + false + end. + +-spec is_registered_process_alive(RegisteredName) -> IsAlive when + RegisteredName :: atom(), + IsAlive :: boolean(). +%% @doc Indicates if the specified registered process is alive. +%% +%% The process must be local to this node. +%% +%% @param RegisteredName the name of the registered process +%% @returns true if the process is alive, false otherwise. + +is_registered_process_alive(Name) -> + is_pid(whereis(Name)). diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl index 1e9e74992762..2f7e9f54987f 100644 --- a/deps/rabbit/test/clustering_management_SUITE.erl +++ b/deps/rabbit/test/clustering_management_SUITE.erl @@ -780,7 +780,7 @@ wait_for_cluster_status(N, Max, Status, AllNodes, Nodes) -> verify_status_equal(Node, Status, AllNodes) -> NodeStatus = sort_cluster_status(cluster_status(Node)), - (AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_mnesia, is_clustered, []) + (AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_db_cluster, is_clustered, []) andalso NodeStatus =:= Status. cluster_status(Node) -> diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 30d346b38876..a882e8fb6cdf 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -881,7 +881,7 @@ ntoab(IP) -> %% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur %% would be bad news. %% -%% See also rabbit_mnesia:is_process_alive/1 which also requires the +%% See also rabbit_process:is_process_alive/1 which also requires the %% process be in the same running cluster as us (i.e. not partitioned %% or some random node). is_process_alive(Pid) when node(Pid) =:= node() -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex index bf3b43e7c49c..caccd1d7a8b3 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex @@ -29,14 +29,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ChangeClusterNodeTypeCommand do def run([node_type_arg], %{node: node_name}) do normalized_type = normalize_type(String.to_atom(node_type_arg)) - current_type = :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :node_type, []) + current_type = :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :node_type, []) case current_type do ^normalized_type -> {:ok, "Node type is already #{normalized_type}"} _ -> - :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :change_cluster_node_type, [ + :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :change_node_type, [ normalized_type ]) end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex index 3217664047ae..b683d0bcc9c5 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex @@ -33,7 +33,19 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([], %{node: node_name, timeout: timeout}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, []) do + status = + case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :cli_cluster_status, []) do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, []) + + {:badrpc, _} = err -> + err + + status -> + status + end + + case status do {:badrpc, _} = err -> err diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex index 978ee0df01e4..00c4101d6a85 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex @@ -24,7 +24,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForceBootCommand do end def run([], %{node: node_name} = opts) do - case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :force_load_next_boot, []) do + ret = + case :rabbit_misc.rpc_call(node_name, :rabbit_db, :force_load_on_next_boot, []) do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :force_load_next_boot, []) + + ret0 -> + ret0 + end + + case ret do {:badrpc, :nodedown} -> case Config.get_option(:data_dir, opts) do nil -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex index b8e87d4810a1..124dc28cd36d 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex @@ -14,7 +14,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForceResetCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppStopped def run([], %{node: node_name}) do - :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :force_reset, []) + :rabbit_misc.rpc_call(node_name, :rabbit_db, :force_reset, []) end def output({:error, :mnesia_unexpectedly_running}, %{node: node_name}) do diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex index 064b51baf07f..fb8f662ce11c 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex @@ -39,7 +39,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do become(node_name, opts), RabbitMQ.CLI.Core.Helpers.defer(fn -> _ = :rabbit_event.start_link() - :rabbit_mnesia.forget_cluster_node(to_atom(node_to_remove), true) + :rabbit_db_cluster.forget_member(to_atom(node_to_remove), true) end) ]) end @@ -48,7 +48,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do atom_name = to_atom(node_to_remove) args = [atom_name, false] - case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :forget_cluster_node, args) do + ret = + case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :forget_member, args) do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :forget_cluster_node, args) + + ret0 -> + ret0 + end + + case ret do {:error, {:failed_to_remove_node, ^atom_name, {:active, _, _}}} -> {:error, "RabbitMQ on node #{node_to_remove} must be stopped with 'rabbitmqctl -n #{node_to_remove} stop_app' before it can be removed"} diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex index f75498831fdb..11b9695ac508 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex @@ -42,12 +42,26 @@ defmodule RabbitMQ.CLI.Ctl.Commands.JoinClusterCommand do long_or_short_names = Config.get_option(:longnames, opts) target_node_normalised = Helpers.normalise_node(target_node, long_or_short_names) - :rabbit_misc.rpc_call( - node_name, - :rabbit_mnesia, - :join_cluster, - [target_node_normalised, node_type] - ) + ret = + :rabbit_misc.rpc_call( + node_name, + :rabbit_db_cluster, + :join, + [target_node_normalised, node_type] + ) + + case ret do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call( + node_name, + :rabbit_mnesia, + :join_cluster, + [target_node_normalised, node_type] + ) + + _ -> + ret + end end def output({:ok, :already_member}, _) do diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex index 7331451fa7fc..dfd85631f25d 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex @@ -14,7 +14,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ResetCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppStopped def run([], %{node: node_name}) do - :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :reset, []) + :rabbit_misc.rpc_call(node_name, :rabbit_db, :reset, []) end def output({:error, :mnesia_unexpectedly_running}, %{node: node_name}) do diff --git a/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl b/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl index 34a60619b929..c5c375b2082c 100644 --- a/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl +++ b/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl @@ -159,7 +159,7 @@ run_four_rabbitmq_nodes(Config) -> ?assertEqual([true, true, true, true], rabbit_ct_broker_helpers:rpc_all( - Config, rabbit_mnesia, is_clustered, [])), + Config, rabbit_db_cluster, is_clustered, [])), ClusteredNodes = lists:sort( rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_nodes, list_running, [])), diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl index 69d31d1543dd..7a79af9a6e9c 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl @@ -59,7 +59,7 @@ augment(Key, ReqData, Node, Data) -> end. node_data(Node, ReqData) -> - S = rabbit_mnesia:status(), + S = rabbit_db_cluster:cli_cluster_status(), Nodes = proplists:get_value(nodes, S), Running = proplists:get_value(running_nodes, S), Type = find_type(Node, Nodes), diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl index 6c95cae38ba6..52c94490fbe9 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl @@ -47,7 +47,7 @@ all_nodes(ReqData) -> end. all_nodes_raw() -> - S = rabbit_mnesia:status(), + S = rabbit_db_cluster:cli_cluster_status(), Nodes = proplists:get_value(nodes, S), Types = proplists:get_keys(Nodes), Running = proplists:get_value(running_nodes, S), diff --git a/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl b/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl index 4a38fabe0cbd..6ae37068d1bd 100644 --- a/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl +++ b/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl @@ -277,7 +277,7 @@ maybe_remove_nodes([Node | Nodes], false) -> ?LOG_WARNING( "Peer discovery: removing unknown node ~ts from the cluster", [Node], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), - rabbit_mnesia:forget_cluster_node(Node, false), + rabbit_db_cluster:forget_member(Node, false), maybe_remove_nodes(Nodes, false). %%-------------------------------------------------------------------- @@ -288,13 +288,7 @@ maybe_remove_nodes([Node | Nodes], false) -> %%-------------------------------------------------------------------- -spec unreachable_nodes() -> [node()]. unreachable_nodes() -> - Status = rabbit_mnesia:status(), - Nodes = proplists:get_value(nodes, Status, []), - Running = proplists:get_value(running_nodes, Status, []), - All = lists:merge(proplists:get_value(disc, Nodes, []), - proplists:get_value(ram, Nodes, [])), - lists:subtract(All, Running). - + rabbit_nodes:list_unreachable(). %%-------------------------------------------------------------------- %% @private diff --git a/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl b/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl index fe0344bfc157..81b127190dc2 100644 --- a/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl +++ b/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl @@ -127,7 +127,4 @@ v(#resource{virtual_host = VHost}) -> VHost. foreach_node(F) -> - [F(Node) || Node <- running_nodes()]. - -running_nodes() -> - proplists:get_value(running_nodes, rabbit_mnesia:status(), []). + [F(Node) || Node <- rabbit_nodes:list_running()].