From a826b5720cc932d4839b571c2948afa4eae63f50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 8 Feb 2023 18:22:03 +0100 Subject: [PATCH 1/3] rabbit_process: Move process liveness functions here from `rabbit_mnesia` These functions extend the functionality of `erlang:is_process_alive/1` to take into account the node a process is running on and its cluster membership. These functions are moved away from `rabbit_mnesia` because we don't want `rabbit_mnesia` to be a central piece of RabbitMQ. Classic-mirrored-queue-related modules continue to use `rabbit_mnesia` functions, therefore relying on Mnesia, because they depend entirely on Mnesia anyway. They will go away at the same time as our use of Mnesia. So by keeping this code untouched, we avoid possible regressions. --- deps/rabbit/src/rabbit_amqqueue.erl | 6 +- deps/rabbit/src/rabbit_classic_queue.erl | 6 +- deps/rabbit/src/rabbit_mnesia.erl | 7 +++ deps/rabbit/src/rabbit_prequeue.erl | 6 +- deps/rabbit/src/rabbit_process.erl | 77 ++++++++++++++++++++++++ deps/rabbit_common/src/rabbit_misc.erl | 2 +- 6 files changed, 94 insertions(+), 10 deletions(-) create mode 100644 deps/rabbit/src/rabbit_process.erl diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a6ef8adcd97c..2fff0b788795 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -574,7 +574,7 @@ retry_wait(Q, F, E, RetriesLeft) -> {stopped, false} -> E({absent, Q, stopped}); _ -> - case rabbit_mnesia:is_process_alive(QPid) of + case rabbit_process:is_process_alive(QPid) of true -> % rabbitmq-server#1682 % The old check would have crashed here, @@ -1814,7 +1814,7 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> false; is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> Pid = amqqueue:get_pid(Q), - not rabbit_mnesia:is_process_alive(Pid). + not rabbit_process:is_process_alive(Pid). -spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean(). has_synchronised_mirrors_online(Q) -> @@ -1881,7 +1881,7 @@ on_node_down(Node) -> filter_transient_queues_to_delete(Node) -> fun(Q) -> amqqueue:qnode(Q) == Node andalso - not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) + not rabbit_process:is_process_alive(amqqueue:get_pid(Q)) andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q)) andalso (not rabbit_amqqueue:is_replicated(Q) orelse rabbit_amqqueue:is_dead_exclusive(Q)) diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 4ba3244dc880..c9b0c36408a7 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -129,7 +129,7 @@ is_recoverable(Q) when ?is_amqqueue(Q) -> %% the policy). Thus, we must check if the local pid is alive %% - if the record is present - in order to restart. (not rabbit_db_queue:consistent_exists(amqqueue:get_name(Q)) - orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))). + orelse not rabbit_process:is_process_alive(amqqueue:get_pid(Q))). recover(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -438,11 +438,11 @@ wait_for_promoted_or_stopped(Q0) -> {ok, Q} -> QPid = amqqueue:get_pid(Q), SPids = amqqueue:get_slave_pids(Q), - case rabbit_mnesia:is_process_alive(QPid) of + case rabbit_process:is_process_alive(QPid) of true -> {promoted, Q}; false -> case lists:any(fun(Pid) -> - rabbit_mnesia:is_process_alive(Pid) + rabbit_process:is_process_alive(Pid) end, SPids) of %% There is a live slave. May be promoted true -> diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index 703f7f039dc3..084a59ed79d6 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -56,6 +56,13 @@ %% Used internally in rpc calls -export([node_info/0, remove_node_if_mnesia_running/1]). +-deprecated({on_running_node, 1, + "Use rabbit_process:on_running_node/1 instead"}). +-deprecated({is_process_alive, 1, + "Use rabbit_process:is_process_alive/1 instead"}). +-deprecated({is_registered_process_alive, 1, + "Use rabbit_process:is_registered_process_alive/1 instead"}). + -ifdef(TEST). -compile(export_all). -export([init_with_lock/3]). diff --git a/deps/rabbit/src/rabbit_prequeue.erl b/deps/rabbit/src/rabbit_prequeue.erl index 02a3be53fb3d..27aac858532c 100644 --- a/deps/rabbit/src/rabbit_prequeue.erl +++ b/deps/rabbit/src/rabbit_prequeue.erl @@ -55,9 +55,9 @@ init(Q0, restart) when ?is_amqqueue(Q0) -> QPid = amqqueue:get_pid(Q1), SPids = amqqueue:get_slave_pids(Q1), LocalOrMasterDown = node(QPid) =:= node() - orelse not rabbit_mnesia:on_running_node(QPid), - Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], - case rabbit_mnesia:is_process_alive(QPid) of + orelse not rabbit_process:on_running_node(QPid), + Slaves = [SPid || SPid <- SPids, rabbit_process:is_process_alive(SPid)], + case rabbit_process:is_process_alive(QPid) of true -> false = LocalOrMasterDown, %% assertion rabbit_mirror_queue_slave:go(self(), async), rabbit_mirror_queue_slave:init(Q1); %% [1] diff --git a/deps/rabbit/src/rabbit_process.erl b/deps/rabbit/src/rabbit_process.erl new file mode 100644 index 000000000000..29ab7456354c --- /dev/null +++ b/deps/rabbit/src/rabbit_process.erl @@ -0,0 +1,77 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_process). + +-export([on_running_node/1, + is_process_alive/1, + is_registered_process_alive/1]). + +-spec on_running_node(Pid) -> OnRunningNode when + Pid :: pid(), + OnRunningNode :: boolean(). +%% @doc Indicates if the specified process runs on a member of the cluster. +%% +%% @param Pid the PID of the process to check +%% @returns true if the process runs on one of the cluster members, false +%% otherwise. + +on_running_node(Pid) -> + Node = node(Pid), + rabbit_nodes:is_running(Node). + +%% This requires the process be in the same running cluster as us +%% (i.e. not partitioned or some random node). +%% +%% See also rabbit_misc:is_process_alive/1 which does not. + +-spec is_process_alive(Pid) -> IsAlive when + Pid :: pid() | {RegisteredName, Node}, + RegisteredName :: atom(), + Node :: node(), + IsAlive :: boolean(). +%% @doc Indicates if the specified process is alive. +%% +%% Unlike {@link erlang:is_process_alive/1}, this function works with remote +%% processes and registered processes. However, the process must run on one of +%% the cluster members. +%% +%% @param Pid the PID or name of the process to check +%% @returns true if the process is alive runs on one of the cluster members, +%% false otherwise. + +is_process_alive(Pid) when is_pid(Pid) -> + on_running_node(Pid) + andalso + rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true; +is_process_alive({Name, Node}) when is_atom(Name) andalso is_atom(Node) -> + case rabbit_nodes:is_running(Node) of + true -> + try + erpc:call(Node, ?MODULE, is_registered_process_alive, [Name]) + catch + error:{exception, undef, [{?MODULE, _, _, _} | _]} -> + rpc:call( + Node, + rabbit_mnesia, is_registered_process_alive, [Name]) + end; + false -> + false + end. + +-spec is_registered_process_alive(RegisteredName) -> IsAlive when + RegisteredName :: atom(), + IsAlive :: boolean(). +%% @doc Indicates if the specified registered process is alive. +%% +%% The process must be local to this node. +%% +%% @param RegisteredName the name of the registered process +%% @returns true if the process is alive, false otherwise. + +is_registered_process_alive(Name) -> + is_pid(whereis(Name)). diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 30d346b38876..a882e8fb6cdf 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -881,7 +881,7 @@ ntoab(IP) -> %% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur %% would be bad news. %% -%% See also rabbit_mnesia:is_process_alive/1 which also requires the +%% See also rabbit_process:is_process_alive/1 which also requires the %% process be in the same running cluster as us (i.e. not partitioned %% or some random node). is_process_alive(Pid) when node(Pid) =:= node() -> From 42bcd94dce5e1aa3797329be42c82ccf2df5def9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 13 Jan 2023 18:11:38 +0100 Subject: [PATCH 2/3] rabbit_db_cluster: New module on top of databases clustering This new module sits on top of `rabbit_mnesia` and provide an API with all cluster-related functions. `rabbit_mnesia` should be called directly inside Mnesia-specific code only, `rabbit_mnesia_rename` or classic mirrored queues for instance. Otherwise, `rabbit_db_cluster` must be used. Several modules, in particular in `rabbitmq_cli`, continue to call `rabbit_mnesia` as a fallback option if the `rabbit_db_cluster` module unavailable. This will be the case when the CLI will interact with an older RabbitMQ version. This will help with the introduction of a new database backend. --- deps/rabbit/src/rabbit.erl | 2 +- deps/rabbit/src/rabbit_db.erl | 49 +++++- deps/rabbit/src/rabbit_db_cluster.erl | 150 ++++++++++++++++++ deps/rabbit/src/rabbit_mnesia.erl | 17 +- deps/rabbit/src/rabbit_node_monitor.erl | 6 +- deps/rabbit/src/rabbit_nodes.erl | 2 +- deps/rabbit/src/rabbit_prelaunch_cluster.erl | 2 +- .../test/clustering_management_SUITE.erl | 2 +- .../change_cluster_node_type_command.ex | 4 +- .../ctl/commands/cluster_status_command.ex | 14 +- .../cli/ctl/commands/force_boot_command.ex | 11 +- .../cli/ctl/commands/force_reset_command.ex | 2 +- .../commands/forget_cluster_node_command.ex | 13 +- .../cli/ctl/commands/join_cluster_command.ex | 26 ++- .../cli/ctl/commands/reset_command.ex | 2 +- .../test/terraform_SUITE.erl | 2 +- .../src/rabbit_mgmt_wm_node.erl | 2 +- .../src/rabbit_mgmt_wm_nodes.erl | 2 +- .../src/rabbit_peer_discovery_cleanup.erl | 2 +- 19 files changed, 274 insertions(+), 36 deletions(-) create mode 100644 deps/rabbit/src/rabbit_db_cluster.erl diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index a09a726f6762..236e084e3d1b 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1026,7 +1026,7 @@ prep_stop(State) -> stop(State) -> ok = rabbit_alarm:stop(), - ok = case rabbit_mnesia:is_clustered() of + ok = case rabbit_db_cluster:is_clustered() of true -> ok; false -> rabbit_table:clear_ram_only_tables() end, diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index 0e60e2033e3e..e15b78df3caf 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -13,6 +13,9 @@ -include_lib("rabbit_common/include/logging.hrl"). -export([init/0, + reset/0, + force_reset/0, + force_load_on_next_boot/0, is_virgin_node/0, is_virgin_node/1, dir/0, ensure_dir_exists/0]). @@ -40,7 +43,7 @@ init() -> "DB: this node is virgin: ~ts", [IsVirgin], #{domain => ?RMQLOG_DOMAIN_DB}), ensure_dir_exists(), - case init_mnesia() of + case init_using_mnesia() of ok -> ?LOG_DEBUG( "DB: initialization successeful", @@ -53,7 +56,7 @@ init() -> Error end. -init_mnesia() -> +init_using_mnesia() -> ?LOG_DEBUG( "DB: initialize Mnesia", #{domain => ?RMQLOG_DOMAIN_DB}), @@ -61,6 +64,48 @@ init_mnesia() -> ?assertEqual(rabbit:data_dir(), mnesia_dir()), rabbit_sup:start_child(mnesia_sync). +-spec reset() -> Ret when + Ret :: ok. +%% @doc Resets the database and the node. + +reset() -> + reset_using_mnesia(). + +reset_using_mnesia() -> + ?LOG_DEBUG( + "DB: resetting node", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_mnesia:reset(). + +-spec force_reset() -> Ret when + Ret :: ok. +%% @doc Resets the database and the node. + +force_reset() -> + force_reset_using_mnesia(). + +force_reset_using_mnesia() -> + ?LOG_DEBUG( + "DB: resetting node forcefully", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_mnesia:force_reset(). + +-spec force_load_on_next_boot() -> Ret when + Ret :: ok. +%% @doc Requests that the database to be forcefully loaded during next boot. +%% +%% This is necessary when a node refuses to boot when the cluster is in a bad +%% state, like if critical members are MIA. + +force_load_on_next_boot() -> + force_load_on_next_boot_using_mnesia(). + +force_load_on_next_boot_using_mnesia() -> + ?LOG_DEBUG( + "DB: resetting node forcefully", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_mnesia:force_load_next_boot(). + -spec is_virgin_node() -> IsVirgin when IsVirgin :: boolean(). %% @doc Indicates if this RabbitMQ node is virgin. diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl new file mode 100644 index 000000000000..aa89ac0b7813 --- /dev/null +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -0,0 +1,150 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_db_cluster). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-export([join/2, + forget_member/2]). +-export([change_node_type/1]). +-export([is_clustered/0, + members/0, + disc_members/0, + node_type/0, + check_consistency/0, + cli_cluster_status/0]). + +-type node_type() :: disc_node_type() | ram_node_type(). +-type disc_node_type() :: disc. +-type ram_node_type() :: ram. + +-export_type([node_type/0, disc_node_type/0, ram_node_type/0]). + +-define( + IS_NODE_TYPE(NodeType), + ((NodeType) =:= disc orelse (NodeType) =:= ram)). + +%% ------------------------------------------------------------------- +%% Cluster formation. +%% ------------------------------------------------------------------- + +-spec join(RemoteNode, NodeType) -> Ret when + RemoteNode :: node(), + NodeType :: rabbit_db_cluster:node_type(), + Ret :: Ok | Error, + Ok :: ok | {ok, already_member}, + Error :: {error, {inconsistent_cluster, string()}}. +%% @doc Adds this node to a cluster using `RemoteNode' to reach it. + +join(RemoteNode, NodeType) + when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) -> + ?LOG_DEBUG( + "DB: joining cluster using remote node `~ts`", [RemoteNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + join_using_mnesia(RemoteNode, NodeType). + +join_using_mnesia(RemoteNode, NodeType) -> + rabbit_mnesia:join_cluster(RemoteNode, NodeType). + +-spec forget_member(Node, RemoveWhenOffline) -> ok when + Node :: node(), + RemoveWhenOffline :: boolean(). +%% @doc Removes `Node' from the cluster. + +forget_member(Node, RemoveWhenOffline) -> + forget_member_using_mnesia(Node, RemoveWhenOffline). + +forget_member_using_mnesia(Node, RemoveWhenOffline) -> + rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline). + +%% ------------------------------------------------------------------- +%% Cluster update. +%% ------------------------------------------------------------------- + +-spec change_node_type(NodeType) -> ok when + NodeType :: rabbit_db_cluster:node_type(). +%% @doc Changes the node type to `NodeType'. +%% +%% Node types may not all be valid with all databases. + +change_node_type(NodeType) -> + change_node_type_using_mnesia(NodeType). + +change_node_type_using_mnesia(NodeType) -> + rabbit_mnesia:change_cluster_node_type(NodeType). + +%% ------------------------------------------------------------------- +%% Cluster status. +%% ------------------------------------------------------------------- + +-spec is_clustered() -> IsClustered when + IsClustered :: boolean(). +%% @doc Indicates if this node is clustered with other nodes or not. + +is_clustered() -> + is_clustered_using_mnesia(). + +is_clustered_using_mnesia() -> + rabbit_mnesia:is_clustered(). + +-spec members() -> Members when + Members :: [node()]. +%% @doc Returns the list of cluster members. + +members() -> + members_using_mnesia(). + +members_using_mnesia() -> + mnesia:system_info(db_nodes). + +-spec disc_members() -> Members when + Members :: [node()]. +%% @private + +disc_members() -> + disc_members_using_mnesia(). + +disc_members_using_mnesia() -> + rabbit_mnesia:cluster_nodes(disc). + +-spec node_type() -> NodeType when + NodeType :: rabbit_db_cluster:node_type(). +%% @doc Returns the type of this node, `disc' or `ram'. +%% +%% Node types may not all be relevant with all databases. + +node_type() -> + node_type_using_mnesia(). + +node_type_using_mnesia() -> + rabbit_mnesia:node_type(). + +-spec check_consistency() -> ok. +%% @doc Ensures the cluster is consistent. + +check_consistency() -> + check_consistency_using_mnesia(). + +check_consistency_using_mnesia() -> + rabbit_mnesia:check_cluster_consistency(). + +-spec cli_cluster_status() -> ClusterStatus when + ClusterStatus :: [{nodes, [{rabbit_db_cluster:node_type(), [node()]}]} | + {running_nodes, [node()]} | + {partitions, [{node(), [node()]}]}]. +%% @doc Returns information from the cluster for the `cluster_status' CLI +%% command. + +cli_cluster_status() -> + cli_cluster_status_using_mnesia(). + +cli_cluster_status_using_mnesia() -> + rabbit_mnesia:status(). diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index 084a59ed79d6..bc4426c31479 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -70,9 +70,8 @@ %%---------------------------------------------------------------------------- --export_type([node_type/0, cluster_status/0]). +-export_type([cluster_status/0]). --type node_type() :: disc | ram. -type cluster_status() :: {[node()], [node()], [node()]}. %%---------------------------------------------------------------------------- @@ -137,12 +136,12 @@ init_with_lock(Retries, Timeout, RunPeerDiscovery) -> init_with_lock(Retries - 1, Timeout, RunPeerDiscovery) end. --spec run_peer_discovery() -> ok | {[node()], node_type()}. +-spec run_peer_discovery() -> ok | {[node()], rabbit_db_cluster:node_type()}. run_peer_discovery() -> {RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(), run_peer_discovery_with_retries(RetriesLeft, DelayInterval). --spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], node_type()}. +-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], rabbit_db_cluster:node_type()}. run_peer_discovery_with_retries(0, _DelayInterval) -> ok; run_peer_discovery_with_retries(RetriesLeft, DelayInterval) -> @@ -228,7 +227,7 @@ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterva %% all in the same cluster, we simply pick the first online node and %% we cluster to its cluster. --spec join_cluster(node(), node_type()) +-spec join_cluster(node(), rabbit_db_cluster:node_type()) -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}. join_cluster(DiscoveryNode, NodeType) -> @@ -317,7 +316,7 @@ wipe() -> ok = rabbit_node_monitor:reset_cluster_status(), ok. --spec change_cluster_node_type(node_type()) -> 'ok'. +-spec change_cluster_node_type(rabbit_db_cluster:node_type()) -> 'ok'. change_cluster_node_type(Type) -> ensure_mnesia_not_running(), @@ -421,7 +420,7 @@ remove_node_offline_node(Node) -> %% Queries %%---------------------------------------------------------------------------- --spec status() -> [{'nodes', [{node_type(), [node()]}]} | +-spec status() -> [{'nodes', [{rabbit_db_cluster:node_type(), [node()]}]} | {'running_nodes', [node()]} | {'partitions', [{node(), [node()]}]}]. @@ -539,7 +538,7 @@ node_info() -> mnesia:system_info(protocol_version), cluster_status_from_mnesia()}. --spec node_type() -> node_type(). +-spec node_type() -> rabbit_db_cluster:node_type(). node_type() -> {_AllNodes, DiscNodes, _RunningNodes} = @@ -607,7 +606,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> rabbit_node_monitor:update_cluster_status(), ok. --spec init_db_unchecked([node()], node_type()) -> 'ok'. +-spec init_db_unchecked([node()], rabbit_db_cluster:node_type()) -> 'ok'. init_db_unchecked(ClusterNodes, NodeType) -> init_db(ClusterNodes, NodeType, false). diff --git a/deps/rabbit/src/rabbit_node_monitor.erl b/deps/rabbit/src/rabbit_node_monitor.erl index 17d63a683178..d469830c9b0c 100644 --- a/deps/rabbit/src/rabbit_node_monitor.erl +++ b/deps/rabbit/src/rabbit_node_monitor.erl @@ -169,7 +169,7 @@ notify_joined_cluster() -> NewMember = node(), Nodes = rabbit_nodes:list_running() -- [NewMember], gen_server:abcast(Nodes, ?SERVER, - {joined_cluster, node(), rabbit_mnesia:node_type()}), + {joined_cluster, node(), rabbit_db_cluster:node_type()}), ok. @@ -415,9 +415,9 @@ handle_call(_Request, _From, State) -> handle_cast(notify_node_up, State = #state{guid = GUID}) -> Nodes = rabbit_nodes:list_running() -- [node()], gen_server:abcast(Nodes, ?SERVER, - {node_up, node(), rabbit_mnesia:node_type(), GUID}), + {node_up, node(), rabbit_db_cluster:node_type(), GUID}), %% register other active rabbits with this rabbit - DiskNodes = rabbit_mnesia:cluster_nodes(disc), + DiskNodes = rabbit_db_cluster:disc_members(), [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of true -> disc; false -> ram diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index a9eb9dad7060..f2e610d3eb1f 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -180,7 +180,7 @@ is_member(Node) when is_atom(Node) -> %% @see filter_members/1. list_members() -> - mnesia:system_info(db_nodes). + rabbit_db_cluster:members(). -spec filter_members(Nodes) -> Nodes when Nodes :: [node()]. diff --git a/deps/rabbit/src/rabbit_prelaunch_cluster.erl b/deps/rabbit/src/rabbit_prelaunch_cluster.erl index c63ebb4b76fa..7effd20cc4c1 100644 --- a/deps/rabbit/src/rabbit_prelaunch_cluster.erl +++ b/deps/rabbit/src/rabbit_prelaunch_cluster.erl @@ -30,5 +30,5 @@ setup(Context) -> ?LOG_DEBUG( "Checking cluster consistency", [], #{domain => ?RMQLOG_DOMAIN_PRELAUNCH}), - rabbit_mnesia:check_cluster_consistency(), + rabbit_db_cluster:check_consistency(), ok. diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl index 1e9e74992762..2f7e9f54987f 100644 --- a/deps/rabbit/test/clustering_management_SUITE.erl +++ b/deps/rabbit/test/clustering_management_SUITE.erl @@ -780,7 +780,7 @@ wait_for_cluster_status(N, Max, Status, AllNodes, Nodes) -> verify_status_equal(Node, Status, AllNodes) -> NodeStatus = sort_cluster_status(cluster_status(Node)), - (AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_mnesia, is_clustered, []) + (AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_db_cluster, is_clustered, []) andalso NodeStatus =:= Status. cluster_status(Node) -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex index bf3b43e7c49c..caccd1d7a8b3 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/change_cluster_node_type_command.ex @@ -29,14 +29,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ChangeClusterNodeTypeCommand do def run([node_type_arg], %{node: node_name}) do normalized_type = normalize_type(String.to_atom(node_type_arg)) - current_type = :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :node_type, []) + current_type = :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :node_type, []) case current_type do ^normalized_type -> {:ok, "Node type is already #{normalized_type}"} _ -> - :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :change_cluster_node_type, [ + :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :change_node_type, [ normalized_type ]) end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex index 3217664047ae..b683d0bcc9c5 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/cluster_status_command.ex @@ -33,7 +33,19 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([], %{node: node_name, timeout: timeout}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, []) do + status = + case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :cli_cluster_status, []) do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, []) + + {:badrpc, _} = err -> + err + + status -> + status + end + + case status do {:badrpc, _} = err -> err diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex index 978ee0df01e4..00c4101d6a85 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_boot_command.ex @@ -24,7 +24,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForceBootCommand do end def run([], %{node: node_name} = opts) do - case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :force_load_next_boot, []) do + ret = + case :rabbit_misc.rpc_call(node_name, :rabbit_db, :force_load_on_next_boot, []) do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :force_load_next_boot, []) + + ret0 -> + ret0 + end + + case ret do {:badrpc, :nodedown} -> case Config.get_option(:data_dir, opts) do nil -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex index b8e87d4810a1..124dc28cd36d 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex @@ -14,7 +14,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForceResetCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppStopped def run([], %{node: node_name}) do - :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :force_reset, []) + :rabbit_misc.rpc_call(node_name, :rabbit_db, :force_reset, []) end def output({:error, :mnesia_unexpectedly_running}, %{node: node_name}) do diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex index 064b51baf07f..fb8f662ce11c 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex @@ -39,7 +39,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do become(node_name, opts), RabbitMQ.CLI.Core.Helpers.defer(fn -> _ = :rabbit_event.start_link() - :rabbit_mnesia.forget_cluster_node(to_atom(node_to_remove), true) + :rabbit_db_cluster.forget_member(to_atom(node_to_remove), true) end) ]) end @@ -48,7 +48,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do atom_name = to_atom(node_to_remove) args = [atom_name, false] - case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :forget_cluster_node, args) do + ret = + case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :forget_member, args) do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :forget_cluster_node, args) + + ret0 -> + ret0 + end + + case ret do {:error, {:failed_to_remove_node, ^atom_name, {:active, _, _}}} -> {:error, "RabbitMQ on node #{node_to_remove} must be stopped with 'rabbitmqctl -n #{node_to_remove} stop_app' before it can be removed"} diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex index f75498831fdb..11b9695ac508 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex @@ -42,12 +42,26 @@ defmodule RabbitMQ.CLI.Ctl.Commands.JoinClusterCommand do long_or_short_names = Config.get_option(:longnames, opts) target_node_normalised = Helpers.normalise_node(target_node, long_or_short_names) - :rabbit_misc.rpc_call( - node_name, - :rabbit_mnesia, - :join_cluster, - [target_node_normalised, node_type] - ) + ret = + :rabbit_misc.rpc_call( + node_name, + :rabbit_db_cluster, + :join, + [target_node_normalised, node_type] + ) + + case ret do + {:badrpc, {:EXIT, {:undef, _}}} -> + :rabbit_misc.rpc_call( + node_name, + :rabbit_mnesia, + :join_cluster, + [target_node_normalised, node_type] + ) + + _ -> + ret + end end def output({:ok, :already_member}, _) do diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex index 7331451fa7fc..dfd85631f25d 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex @@ -14,7 +14,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ResetCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppStopped def run([], %{node: node_name}) do - :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :reset, []) + :rabbit_misc.rpc_call(node_name, :rabbit_db, :reset, []) end def output({:error, :mnesia_unexpectedly_running}, %{node: node_name}) do diff --git a/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl b/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl index 34a60619b929..c5c375b2082c 100644 --- a/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl +++ b/deps/rabbitmq_ct_helpers/test/terraform_SUITE.erl @@ -159,7 +159,7 @@ run_four_rabbitmq_nodes(Config) -> ?assertEqual([true, true, true, true], rabbit_ct_broker_helpers:rpc_all( - Config, rabbit_mnesia, is_clustered, [])), + Config, rabbit_db_cluster, is_clustered, [])), ClusteredNodes = lists:sort( rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_nodes, list_running, [])), diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl index 69d31d1543dd..7a79af9a6e9c 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_node.erl @@ -59,7 +59,7 @@ augment(Key, ReqData, Node, Data) -> end. node_data(Node, ReqData) -> - S = rabbit_mnesia:status(), + S = rabbit_db_cluster:cli_cluster_status(), Nodes = proplists:get_value(nodes, S), Running = proplists:get_value(running_nodes, S), Type = find_type(Node, Nodes), diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl index 6c95cae38ba6..52c94490fbe9 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_nodes.erl @@ -47,7 +47,7 @@ all_nodes(ReqData) -> end. all_nodes_raw() -> - S = rabbit_mnesia:status(), + S = rabbit_db_cluster:cli_cluster_status(), Nodes = proplists:get_value(nodes, S), Types = proplists:get_keys(Nodes), Running = proplists:get_value(running_nodes, S), diff --git a/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl b/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl index 4a38fabe0cbd..52289a3186e4 100644 --- a/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl +++ b/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl @@ -277,7 +277,7 @@ maybe_remove_nodes([Node | Nodes], false) -> ?LOG_WARNING( "Peer discovery: removing unknown node ~ts from the cluster", [Node], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), - rabbit_mnesia:forget_cluster_node(Node, false), + rabbit_db_cluster:forget_member(Node, false), maybe_remove_nodes(Nodes, false). %%-------------------------------------------------------------------- From cd2b4c7585bd51d09fc971c2c4bea03f7717d412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 21 Feb 2023 11:39:30 +0100 Subject: [PATCH 3/3] Use `rabbit_nodes` to list nodes in two plugins ... instead of using an internal implementation. The plugins are `rabbitmq_peer_discovery_common` and `rabbitmq_sharding`. References #7058. --- .../src/rabbit_peer_discovery_cleanup.erl | 8 +------- deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl | 5 +---- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl b/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl index 52289a3186e4..6ae37068d1bd 100644 --- a/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl +++ b/deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl @@ -288,13 +288,7 @@ maybe_remove_nodes([Node | Nodes], false) -> %%-------------------------------------------------------------------- -spec unreachable_nodes() -> [node()]. unreachable_nodes() -> - Status = rabbit_mnesia:status(), - Nodes = proplists:get_value(nodes, Status, []), - Running = proplists:get_value(running_nodes, Status, []), - All = lists:merge(proplists:get_value(disc, Nodes, []), - proplists:get_value(ram, Nodes, [])), - lists:subtract(All, Running). - + rabbit_nodes:list_unreachable(). %%-------------------------------------------------------------------- %% @private diff --git a/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl b/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl index fe0344bfc157..81b127190dc2 100644 --- a/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl +++ b/deps/rabbitmq_sharding/src/rabbit_sharding_shard.erl @@ -127,7 +127,4 @@ v(#resource{virtual_host = VHost}) -> VHost. foreach_node(F) -> - [F(Node) || Node <- running_nodes()]. - -running_nodes() -> - proplists:get_value(running_nodes, rabbit_mnesia:status(), []). + [F(Node) || Node <- rabbit_nodes:list_running()].