Skip to content

Commit ee94e5e

Browse files
committed
rabbit_nodes: Add list functions to clarify which nodes we are interested in
So far, we had the following functions to list nodes in a RabbitMQ cluster: * `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster; the argument was used to select members (all members or only those running Mnesia and participating in the cluster) * `rabbit_nodes:all/0` to get all members of the Mnesia cluster * `rabbit_nodes:all_running/0` to get all members who currently run Mnesia Basically: * `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)` * `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)` We also have: * `rabbit_node_monitor:alive_nodes/1` which filters the given list of nodes to only select those currently running Mnesia * `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given list of nodes to only select those currently running RabbitMQ Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the `rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)` or `rabbit_nodes:all_running/0` is often used as a close approximation of "all cluster members running RabbitMQ". This list might be incorrect in times where a node is joining the clustered or is being worked on (i.e. Mnesia is running but not RabbitMQ). With Khepri, there won't be the same possible approximation because we will try to keep Khepri/Ra running even if RabbitMQ is stopped to expand/shrink the cluster. So in order to clarify what we want when we query a list of nodes, this patch introduces the following functions: * `rabbit_nodes:list_members/0` to get all cluster members, regardless of their state * `rabbit_nodes:list_reachable/0` to get all cluster members we can reach using Erlang distribution, regardless of the state of RabbitMQ * `rabbit_nodes:list_running/0` to get all cluster members who run RabbitMQ, regardless of the maintenance state * `rabbit_nodes:list_serving/0` to get all cluster members who run RabbitMQ and are accepting clients In addition to the list functions, there are the corresponding `rabbit_nodes:filter_*(Nodes)` filtering functions. The code is modified to use these new functions. One possible significant change is that the new list functions will perform RPC calls to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
1 parent b795d61 commit ee94e5e

40 files changed

+456
-116
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
379379
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
380380
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
381381
[Type, VhostSpec, QueueSpec]),
382-
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
382+
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
383383
NumRunning = length(Running),
384384
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
385385
filter_per_type(Type, Q),
@@ -1056,7 +1056,7 @@ check_queue_type(_Val, _Args) ->
10561056

10571057
list() ->
10581058
All = rabbit_db_queue:get_all(),
1059-
NodesRunning = rabbit_nodes:all_running(),
1059+
NodesRunning = rabbit_nodes:list_running(),
10601060
lists:filter(fun (Q) ->
10611061
Pid = amqqueue:get_pid(Q),
10621062
St = amqqueue:get_state(Q),
@@ -1238,7 +1238,7 @@ is_in_virtual_host(Q, VHostName) ->
12381238
-spec list(vhost:name()) -> [amqqueue:amqqueue()].
12391239
list(VHostPath) ->
12401240
All = rabbit_db_queue:get_all(VHostPath),
1241-
NodesRunning = rabbit_nodes:all_running(),
1241+
NodesRunning = rabbit_nodes:list_running(),
12421242
lists:filter(fun (Q) ->
12431243
Pid = amqqueue:get_pid(Q),
12441244
St = amqqueue:get_state(Q),
@@ -1252,7 +1252,7 @@ list_down(VHostPath) ->
12521252
false -> [];
12531253
true ->
12541254
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
1255-
NodesRunning = rabbit_nodes:all_running(),
1255+
NodesRunning = rabbit_nodes:list_running(),
12561256
rabbit_db_queue:filter_all_durable(
12571257
fun (Q) ->
12581258
N = amqqueue:get_name(Q),
@@ -1356,7 +1356,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
13561356
rabbit_control_misc:await_emitters_termination(Pids).
13571357

13581358
collect_info_all(VHostPath, Items) ->
1359-
Nodes = rabbit_nodes:all_running(),
1359+
Nodes = rabbit_nodes:list_running(),
13601360
Ref = make_ref(),
13611361
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
13621362
rabbit_control_misc:await_emitters_termination(Pids),
@@ -1744,10 +1744,8 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
17441744
node_permits_offline_promotion(Node) ->
17451745
case node() of
17461746
Node -> not rabbit:is_running(); %% [1]
1747-
_ -> All = rabbit_nodes:all(),
1748-
Running = rabbit_nodes:all_running(),
1749-
lists:member(Node, All) andalso
1750-
not lists:member(Node, Running) %% [2]
1747+
_ -> NotRunning = rabbit_nodes:list_not_running(),
1748+
lists:member(Node, NotRunning) %% [2]
17511749
end.
17521750
%% [1] In this case if we are a real running node (i.e. rabbitmqctl
17531751
%% has RPCed into us) then we cannot allow promotion. If on the other

deps/rabbit/src/rabbit_autoheal.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ enabled() ->
145145
end.
146146

147147
leader() ->
148-
[Leader | _] = lists:usort(rabbit_nodes:all()),
148+
[Leader | _] = lists:usort(rabbit_nodes:list_members()),
149149
Leader.
150150

151151
%% This is the winner receiving its last notification that a node has
@@ -411,7 +411,7 @@ partition_value(Partition) ->
411411
%% only know which nodes we have been partitioned from, not which
412412
%% nodes are partitioned from each other.
413413
check_other_nodes(LocalPartitions) ->
414-
Nodes = rabbit_nodes:all(),
414+
Nodes = rabbit_nodes:list_members(),
415415
{Results, Bad} = rabbit_node_monitor:status(Nodes -- [node()]),
416416
RemotePartitions = [{Node, proplists:get_value(partitions, Res)}
417417
|| {Node, Res} <- Results],

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ send_drained(Pid, CTagCredit) ->
369369
-spec list() -> [pid()].
370370

371371
list() ->
372-
Nodes = rabbit_nodes:all_running(),
372+
Nodes = rabbit_nodes:list_running(),
373373
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
374374

375375
-spec list_local() -> [pid()].

deps/rabbit/src/rabbit_channel_tracking.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ list() ->
264264
lists:foldl(
265265
fun (Node, Acc) ->
266266
Acc ++ list_on_node(Node)
267-
end, [], rabbit_nodes:all_running()).
267+
end, [], rabbit_nodes:list_running()).
268268

269269
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
270270

@@ -309,7 +309,7 @@ list_on_node_mnesia(Node) ->
309309
#tracked_channel{_ = '_'})
310310
catch exit:{aborted, {no_exists, _}} ->
311311
%% The table might not exist yet (or is already gone)
312-
%% between the time rabbit_nodes:all_running() runs and
312+
%% between the time rabbit_nodes:list_running() runs and
313313
%% returns a specific node, and
314314
%% mnesia:dirty_match_object() is called for that node's
315315
%% table.

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ get_all_tracked_connection_table_names_for_node(Node) ->
435435
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
436436

437437
lookup(Name) ->
438-
Nodes = rabbit_nodes:all_running(),
438+
Nodes = rabbit_nodes:list_running(),
439439
lookup(Name, Nodes).
440440

441441
lookup(_, []) ->
@@ -470,15 +470,15 @@ list() ->
470470
lists:foldl(
471471
fun (Node, Acc) ->
472472
Acc ++ list_on_node(Node)
473-
end, [], rabbit_nodes:all_running()).
473+
end, [], rabbit_nodes:list_running()).
474474

475475
-spec count() -> non_neg_integer().
476476

477477
count() ->
478478
lists:foldl(
479479
fun (Node, Acc) ->
480480
count_on_node(Node) + Acc
481-
end, 0, rabbit_nodes:all_running()).
481+
end, 0, rabbit_nodes:list_running()).
482482

483483
count_on_node(Node) ->
484484
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
@@ -551,7 +551,7 @@ list_on_node_mnesia(Node) ->
551551
#tracked_connection{_ = '_'})
552552
catch exit:{aborted, {no_exists, _}} ->
553553
%% The table might not exist yet (or is already gone)
554-
%% between the time rabbit_nodes:all_running() runs and
554+
%% between the time rabbit_nodes:list_running() runs and
555555
%% returns a specific node, and
556556
%% mnesia:dirty_match_object() is called for that node's
557557
%% table.

deps/rabbit/src/rabbit_core_metrics_gc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ gc_exchanges() ->
102102
gc_process_and_entity(channel_exchange_metrics, GbSet).
103103

104104
gc_nodes() ->
105-
Nodes = rabbit_nodes:all(),
105+
Nodes = rabbit_nodes:list_members(),
106106
GbSet = gb_sets:from_list(Nodes),
107107
gc_entity(node_node_metrics, GbSet).
108108

deps/rabbit/src/rabbit_direct.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ list_local() ->
4444
-spec list() -> [pid()].
4545

4646
list() ->
47-
Nodes = rabbit_nodes:all_running(),
47+
Nodes = rabbit_nodes:list_running(),
4848
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT).
4949

5050
%%----------------------------------------------------------------------------

deps/rabbit/src/rabbit_ff_controller.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -914,21 +914,21 @@ post_enable(#{states_per_node := _}, FeatureName, Nodes) ->
914914

915915
-ifndef(TEST).
916916
all_nodes() ->
917-
lists:usort([node() | mnesia:system_info(db_nodes)]).
917+
lists:usort([node() | rabbit_nodes:list_members()]).
918918

919919
running_nodes() ->
920-
lists:usort([node() | mnesia:system_info(running_db_nodes)]).
920+
lists:usort([node() | rabbit_nodes:list_running()]).
921921
-else.
922922
all_nodes() ->
923923
RemoteNodes = case rabbit_feature_flags:get_overriden_nodes() of
924-
undefined -> mnesia:system_info(db_nodes);
924+
undefined -> rabbit_nodes:list_members();
925925
Nodes -> Nodes
926926
end,
927927
lists:usort([node() | RemoteNodes]).
928928

929929
running_nodes() ->
930930
RemoteNodes = case rabbit_feature_flags:get_overriden_running_nodes() of
931-
undefined -> mnesia:system_info(running_db_nodes);
931+
undefined -> rabbit_nodes:list_running();
932932
Nodes -> Nodes
933933
end,
934934
lists:usort([node() | RemoteNodes]).

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ stop_local_quorum_queue_followers() ->
290290

291291
-spec primary_replica_transfer_candidate_nodes() -> [node()].
292292
primary_replica_transfer_candidate_nodes() ->
293-
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
293+
filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
294294

295295
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
296296
random_primary_replica_transfer_candidate_node([], _Preferred) ->

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
154154
slaves_to_start_on_failure(Q, DeadGMPids) ->
155155
%% In case Mnesia has not caught up yet, filter out nodes we know
156156
%% to be dead..
157-
ClusterNodes = rabbit_nodes:all_running() --
157+
ClusterNodes = rabbit_nodes:list_running() --
158158
[node(P) || P <- DeadGMPids],
159159
{_, OldNodes, _} = actual_queue_nodes(Q),
160160
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
@@ -321,7 +321,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
321321
%% a long time without being removed.
322322
update_recoverable(SPids, RS) ->
323323
SNodes = [node(SPid) || SPid <- SPids],
324-
RunningNodes = rabbit_nodes:all_running(),
324+
RunningNodes = rabbit_nodes:list_running(),
325325
AddNodes = SNodes -- RS,
326326
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
327327
(RS -- DelNodes) ++ AddNodes.
@@ -375,17 +375,17 @@ promote_slave([SPid | SPids]) ->
375375
-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
376376

377377
initial_queue_node(Q, DefNode) ->
378-
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
378+
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:list_running()),
379379
MNode.
380380

381381
-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
382382
{node(), [node()]}.
383383

384-
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
384+
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:list_running()).
385385
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
386386

387387
%% The third argument exists so we can pull a call to
388-
%% rabbit_nodes:all_running() out of a loop or transaction
388+
%% rabbit_nodes:list_running() out of a loop or transaction
389389
%% or both.
390390
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
391391
Owner = amqqueue:get_exclusive_owner(Q),

0 commit comments

Comments
 (0)