Skip to content

Commit ff6a4ac

Browse files
committed
rabbit_nodes: Add list functions to clarify which nodes we are interested in
So far, we had the following functions to list nodes in a RabbitMQ cluster: * `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster; the argument was used to select members (all members or only those running Mnesia and participating in the cluster) * `rabbit_nodes:all/0` to get all members of the Mnesia cluster * `rabbit_nodes:all_running/0` to get all members who currently run Mnesia Basically: * `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)` * `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)` We also have: * `rabbit_node_monitor:alive_nodes/1` which filters the given list of nodes to only select those currently running Mnesia * `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given list of nodes to only select those currently running RabbitMQ Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the `rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)` or `rabbit_nodes:all_running/0` is often used as a close approximation of "all cluster members running RabbitMQ". This list might be incorrect in times where a node is joining the clustered or is being worked on (i.e. Mnesia is running but not RabbitMQ). With Khepri, there won't be the same possible approximation because we will try to keep Khepri/Ra running even if RabbitMQ is stopped to expand/shrink the cluster. So in order to clarify what we want when we query a list of nodes, this patch introduces the following functions: * `rabbit_nodes:list_members/0` to get all cluster members, regardless of their state * `rabbit_nodes:list_reachable/0` to get all cluster members we can reach using Erlang distribution, regardless of the state of RabbitMQ * `rabbit_nodes:list_running/0` to get all cluster members who run RabbitMQ, regardless of the maintenance state * `rabbit_nodes:list_serving/0` to get all cluster members who run RabbitMQ and are accepting clients In addition to the list functions, there are the corresponding `rabbit_nodes:filter_*(Nodes)` filtering functions. The code is modified to use these new functions. One possible significant change is that the new list functions will perform RPC calls to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
1 parent 70094e6 commit ff6a4ac

37 files changed

+453
-112
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
441441
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
442442
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
443443
[Type, VhostSpec, QueueSpec]),
444-
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
444+
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
445445
NumRunning = length(Running),
446446
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
447447
filter_per_type(Type, Q),
@@ -1121,7 +1121,7 @@ list() ->
11211121

11221122
do_list() ->
11231123
All = mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()),
1124-
NodesRunning = rabbit_nodes:all_running(),
1124+
NodesRunning = rabbit_nodes:list_running(),
11251125
lists:filter(fun (Q) ->
11261126
Pid = amqqueue:get_pid(Q),
11271127
St = amqqueue:get_state(Q),
@@ -1307,7 +1307,7 @@ is_in_virtual_host(Q, VHostName) ->
13071307
-spec list(vhost:name()) -> [amqqueue:amqqueue()].
13081308
list(VHostPath) ->
13091309
All = list(VHostPath, rabbit_queue),
1310-
NodesRunning = rabbit_nodes:all_running(),
1310+
NodesRunning = rabbit_nodes:list_running(),
13111311
lists:filter(fun (Q) ->
13121312
Pid = amqqueue:get_pid(Q),
13131313
St = amqqueue:get_state(Q),
@@ -1368,7 +1368,7 @@ list_down(VHostPath) ->
13681368
true ->
13691369
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
13701370
Durable = list(VHostPath, rabbit_durable_queue),
1371-
NodesRunning = rabbit_nodes:all_running(),
1371+
NodesRunning = rabbit_nodes:list_running(),
13721372
lists:filter(fun (Q) ->
13731373
N = amqqueue:get_name(Q),
13741374
Pid = amqqueue:get_pid(Q),
@@ -1488,7 +1488,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
14881488
rabbit_control_misc:await_emitters_termination(Pids).
14891489

14901490
collect_info_all(VHostPath, Items) ->
1491-
Nodes = rabbit_nodes:all_running(),
1491+
Nodes = rabbit_nodes:list_running(),
14921492
Ref = make_ref(),
14931493
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
14941494
rabbit_control_misc:await_emitters_termination(Pids),
@@ -1904,10 +1904,8 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
19041904
node_permits_offline_promotion(Node) ->
19051905
case node() of
19061906
Node -> not rabbit:is_running(); %% [1]
1907-
_ -> All = rabbit_nodes:all(),
1908-
Running = rabbit_nodes:all_running(),
1909-
lists:member(Node, All) andalso
1910-
not lists:member(Node, Running) %% [2]
1907+
_ -> NotRunning = rabbit_nodes:list_not_running(),
1908+
lists:member(Node, NotRunning) %% [2]
19111909
end.
19121910
%% [1] In this case if we are a real running node (i.e. rabbitmqctl
19131911
%% has RPCed into us) then we cannot allow promotion. If on the other

deps/rabbit/src/rabbit_autoheal.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ enabled() ->
145145
end.
146146

147147
leader() ->
148-
[Leader | _] = lists:usort(rabbit_nodes:all()),
148+
[Leader | _] = lists:usort(rabbit_nodes:list_members()),
149149
Leader.
150150

151151
%% This is the winner receiving its last notification that a node has
@@ -411,7 +411,7 @@ partition_value(Partition) ->
411411
%% only know which nodes we have been partitioned from, not which
412412
%% nodes are partitioned from each other.
413413
check_other_nodes(LocalPartitions) ->
414-
Nodes = rabbit_nodes:all(),
414+
Nodes = rabbit_nodes:list_members(),
415415
{Results, Bad} = rabbit_node_monitor:status(Nodes -- [node()]),
416416
RemotePartitions = [{Node, proplists:get_value(partitions, Res)}
417417
|| {Node, Res} <- Results],

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ send_drained(Pid, CTagCredit) ->
369369
-spec list() -> [pid()].
370370

371371
list() ->
372-
Nodes = rabbit_nodes:all_running(),
372+
Nodes = rabbit_nodes:list_running(),
373373
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
374374

375375
-spec list_local() -> [pid()].

deps/rabbit/src/rabbit_channel_tracking.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ list() ->
264264
lists:foldl(
265265
fun (Node, Acc) ->
266266
Acc ++ list_on_node(Node)
267-
end, [], rabbit_nodes:all_running()).
267+
end, [], rabbit_nodes:list_running()).
268268

269269
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
270270

@@ -309,7 +309,7 @@ list_on_node_mnesia(Node) ->
309309
#tracked_channel{_ = '_'})
310310
catch exit:{aborted, {no_exists, _}} ->
311311
%% The table might not exist yet (or is already gone)
312-
%% between the time rabbit_nodes:all_running() runs and
312+
%% between the time rabbit_nodes:list_running() runs and
313313
%% returns a specific node, and
314314
%% mnesia:dirty_match_object() is called for that node's
315315
%% table.

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ get_all_tracked_connection_table_names_for_node(Node) ->
435435
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
436436

437437
lookup(Name) ->
438-
Nodes = rabbit_nodes:all_running(),
438+
Nodes = rabbit_nodes:list_running(),
439439
lookup(Name, Nodes).
440440

441441
lookup(_, []) ->
@@ -470,15 +470,15 @@ list() ->
470470
lists:foldl(
471471
fun (Node, Acc) ->
472472
Acc ++ list_on_node(Node)
473-
end, [], rabbit_nodes:all_running()).
473+
end, [], rabbit_nodes:list_running()).
474474

475475
-spec count() -> non_neg_integer().
476476

477477
count() ->
478478
lists:foldl(
479479
fun (Node, Acc) ->
480480
count_on_node(Node) + Acc
481-
end, 0, rabbit_nodes:all_running()).
481+
end, 0, rabbit_nodes:list_running()).
482482

483483
count_on_node(Node) ->
484484
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
@@ -551,7 +551,7 @@ list_on_node_mnesia(Node) ->
551551
#tracked_connection{_ = '_'})
552552
catch exit:{aborted, {no_exists, _}} ->
553553
%% The table might not exist yet (or is already gone)
554-
%% between the time rabbit_nodes:all_running() runs and
554+
%% between the time rabbit_nodes:list_running() runs and
555555
%% returns a specific node, and
556556
%% mnesia:dirty_match_object() is called for that node's
557557
%% table.

deps/rabbit/src/rabbit_core_metrics_gc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ gc_exchanges() ->
102102
gc_process_and_entity(channel_exchange_metrics, GbSet).
103103

104104
gc_nodes() ->
105-
Nodes = rabbit_nodes:all(),
105+
Nodes = rabbit_nodes:list_members(),
106106
GbSet = gb_sets:from_list(Nodes),
107107
gc_entity(node_node_metrics, GbSet).
108108

deps/rabbit/src/rabbit_direct.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ list_local() ->
4444
-spec list() -> [pid()].
4545

4646
list() ->
47-
Nodes = rabbit_nodes:all_running(),
47+
Nodes = rabbit_nodes:list_running(),
4848
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT).
4949

5050
%%----------------------------------------------------------------------------

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ stop_local_quorum_queue_followers() ->
341341

342342
-spec primary_replica_transfer_candidate_nodes() -> [node()].
343343
primary_replica_transfer_candidate_nodes() ->
344-
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
344+
filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
345345

346346
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
347347
random_primary_replica_transfer_candidate_node([], _Preferred) ->

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
154154
slaves_to_start_on_failure(Q, DeadGMPids) ->
155155
%% In case Mnesia has not caught up yet, filter out nodes we know
156156
%% to be dead..
157-
ClusterNodes = rabbit_nodes:all_running() --
157+
ClusterNodes = rabbit_nodes:list_running() --
158158
[node(P) || P <- DeadGMPids],
159159
{_, OldNodes, _} = actual_queue_nodes(Q),
160160
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
@@ -321,7 +321,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
321321
%% a long time without being removed.
322322
update_recoverable(SPids, RS) ->
323323
SNodes = [node(SPid) || SPid <- SPids],
324-
RunningNodes = rabbit_nodes:all_running(),
324+
RunningNodes = rabbit_nodes:list_running(),
325325
AddNodes = SNodes -- RS,
326326
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
327327
(RS -- DelNodes) ++ AddNodes.
@@ -375,17 +375,17 @@ promote_slave([SPid | SPids]) ->
375375
-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
376376

377377
initial_queue_node(Q, DefNode) ->
378-
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
378+
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:list_running()),
379379
MNode.
380380

381381
-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
382382
{node(), [node()]}.
383383

384-
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
384+
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:list_running()).
385385
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
386386

387387
%% The third argument exists so we can pull a call to
388-
%% rabbit_nodes:all_running() out of a loop or transaction
388+
%% rabbit_nodes:list_running() out of a loop or transaction
389389
%% or both.
390390
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
391391
Owner = amqqueue:get_exclusive_owner(Q),

deps/rabbit/src/rabbit_mnesia_rename.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ prepare(Node, NodeMapList) ->
100100

101101
%% Check that we are in the cluster, all old nodes are in the
102102
%% cluster, and no new nodes are.
103-
Nodes = rabbit_nodes:all(),
103+
Nodes = rabbit_nodes:list_members(),
104104
case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
105105
lists:member(Node, Nodes ++ ToNodes)} of
106106
{[], [], true} -> ok;
@@ -130,7 +130,7 @@ restore_backup(Backup) ->
130130
-spec maybe_finish() -> ok.
131131

132132
maybe_finish() ->
133-
AllNodes = rabbit_nodes:all(),
133+
AllNodes = rabbit_nodes:list_members(),
134134
maybe_finish(AllNodes).
135135

136136
-spec maybe_finish([node()]) -> 'ok'.
@@ -144,7 +144,7 @@ maybe_finish(AllNodes) ->
144144
finish(FromNode, ToNode, AllNodes) ->
145145
case node() of
146146
ToNode ->
147-
case rabbit_nodes:filter_nodes_running_rabbitmq(AllNodes) of
147+
case rabbit_nodes:filter_running(AllNodes) of
148148
[] -> finish_primary(FromNode, ToNode);
149149
_ -> finish_secondary(FromNode, ToNode, AllNodes)
150150
end;
@@ -257,8 +257,8 @@ update_term(_NodeMap, Term) ->
257257
Term.
258258

259259
rename_in_running_mnesia(FromNode, ToNode) ->
260-
All = rabbit_nodes:all(),
261-
Running = rabbit_nodes:all_running(),
260+
All = rabbit_nodes:list_members(),
261+
Running = rabbit_nodes:list_running(),
262262
case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
263263
{false, true} -> ok;
264264
{true, _} -> exit({old_node_running, FromNode});

0 commit comments

Comments
 (0)