Skip to content

Commit 74ca25b

Browse files
committed
rabbit_nodes: Add list functions to clarify which nodes we are interested in
So far, we had the following functions to list nodes in a RabbitMQ cluster: * `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster; the argument was used to select members (all members or only those running Mnesia and participating in the cluster) * `rabbit_nodes:all/0` to get all members of the Mnesia cluster * `rabbit_nodes:all_running/0` to get all members who currently run Mnesia Basically: * `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all) * `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)` We also have: * `rabbit_node_monitor:alive_nodes/1` which filters the given list of nodes to only select those currently running Mnesia * `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given list of nodes to only select those currently running RabbitMQ Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the `rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)` or `rabbit_nodes:all_running/0` is often used as a close approximation of "all cluster members running RabbitMQ". This list might be incorrect in times where a node is joining the clustered or is being worked on (i.e. Mnesia is running but not RabbitMQ). With Khepri, there won't be the same possible approximation because we will try to keep Khepri/Ra running even if RabbitMQ is stopped to expand/shrink the cluster. So in order to clarify what we want when we query a list of nodes, this patch introduces the following functions: * `rabbit_nodes:list_members/0` to get all cluster members, regardless of their state * `rabbit_nodes:list_reachable/0` to get all cluster members we can reach using Erlang distribution, regardless of the state of RabbitMQ * `rabbit_nodes:list_running/0` to get all cluster members who run RabbitMQ, regardless of the maintenance state * `rabbit_nodes:list_serving/0` to get all cluster members who run RabbitMQ and are accepting clients In addition to the list functions, there are the corresponding `rabbit_nodes:filter_*(Nodes)` filtering functions. The code is modified to use these new functions. One possible significant change is that the new list functions will perform RPC calls to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
1 parent 1d77856 commit 74ca25b

23 files changed

+419
-88
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
441441
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
442442
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
443443
[Type, VhostSpec, QueueSpec]),
444-
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
444+
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
445445
NumRunning = length(Running),
446446
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
447447
filter_per_type(Type, Q),
@@ -1121,7 +1121,7 @@ list() ->
11211121

11221122
do_list() ->
11231123
All = mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()),
1124-
NodesRunning = rabbit_nodes:all_running(),
1124+
NodesRunning = rabbit_nodes:list_running(),
11251125
lists:filter(fun (Q) ->
11261126
Pid = amqqueue:get_pid(Q),
11271127
St = amqqueue:get_state(Q),
@@ -1307,7 +1307,7 @@ is_in_virtual_host(Q, VHostName) ->
13071307
-spec list(vhost:name()) -> [amqqueue:amqqueue()].
13081308
list(VHostPath) ->
13091309
All = list(VHostPath, rabbit_queue),
1310-
NodesRunning = rabbit_nodes:all_running(),
1310+
NodesRunning = rabbit_nodes:list_running(),
13111311
lists:filter(fun (Q) ->
13121312
Pid = amqqueue:get_pid(Q),
13131313
St = amqqueue:get_state(Q),
@@ -1368,7 +1368,7 @@ list_down(VHostPath) ->
13681368
true ->
13691369
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
13701370
Durable = list(VHostPath, rabbit_durable_queue),
1371-
NodesRunning = rabbit_nodes:all_running(),
1371+
NodesRunning = rabbit_nodes:list_running(),
13721372
lists:filter(fun (Q) ->
13731373
N = amqqueue:get_name(Q),
13741374
Pid = amqqueue:get_pid(Q),
@@ -1488,7 +1488,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
14881488
rabbit_control_misc:await_emitters_termination(Pids).
14891489

14901490
collect_info_all(VHostPath, Items) ->
1491-
Nodes = rabbit_nodes:all_running(),
1491+
Nodes = rabbit_nodes:list_running(),
14921492
Ref = make_ref(),
14931493
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
14941494
rabbit_control_misc:await_emitters_termination(Pids),
@@ -1904,10 +1904,8 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
19041904
node_permits_offline_promotion(Node) ->
19051905
case node() of
19061906
Node -> not rabbit:is_running(); %% [1]
1907-
_ -> All = rabbit_nodes:all(),
1908-
Running = rabbit_nodes:all_running(),
1909-
lists:member(Node, All) andalso
1910-
not lists:member(Node, Running) %% [2]
1907+
_ -> NotRunning = rabbit_nodes:list_not_running(),
1908+
lists:member(Node, NotRunning) %% [2]
19111909
end.
19121910
%% [1] In this case if we are a real running node (i.e. rabbitmqctl
19131911
%% has RPCed into us) then we cannot allow promotion. If on the other

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ send_drained(Pid, CTagCredit) ->
371371
-spec list() -> [pid()].
372372

373373
list() ->
374-
Nodes = rabbit_nodes:all_running(),
374+
Nodes = rabbit_nodes:list_running(),
375375
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
376376

377377
-spec list_local() -> [pid()].

deps/rabbit/src/rabbit_channel_tracking.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ list() ->
262262
lists:foldl(
263263
fun (Node, Acc) ->
264264
Acc ++ list_on_node(Node)
265-
end, [], rabbit_nodes:all_running()).
265+
end, [], rabbit_nodes:list_running()).
266266

267267
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
268268

@@ -307,7 +307,7 @@ list_on_node_mnesia(Node) ->
307307
#tracked_channel{_ = '_'})
308308
catch exit:{aborted, {no_exists, _}} ->
309309
%% The table might not exist yet (or is already gone)
310-
%% between the time rabbit_nodes:all_running() runs and
310+
%% between the time rabbit_nodes:list_running() runs and
311311
%% returns a specific node, and
312312
%% mnesia:dirty_match_object() is called for that node's
313313
%% table.

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ get_all_tracked_connection_table_names_for_node(Node) ->
434434
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
435435

436436
lookup(Name) ->
437-
Nodes = rabbit_nodes:all_running(),
437+
Nodes = rabbit_nodes:list_running(),
438438
lookup(Name, Nodes).
439439

440440
lookup(_, []) ->
@@ -469,15 +469,15 @@ list() ->
469469
lists:foldl(
470470
fun (Node, Acc) ->
471471
Acc ++ list_on_node(Node)
472-
end, [], rabbit_nodes:all_running()).
472+
end, [], rabbit_nodes:list_running()).
473473

474474
-spec count() -> non_neg_integer().
475475

476476
count() ->
477477
lists:foldl(
478478
fun (Node, Acc) ->
479479
count_on_node(Node) + Acc
480-
end, 0, rabbit_nodes:all_running()).
480+
end, 0, rabbit_nodes:list_running()).
481481

482482
count_on_node(Node) ->
483483
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
@@ -550,7 +550,7 @@ list_on_node_mnesia(Node) ->
550550
#tracked_connection{_ = '_'})
551551
catch exit:{aborted, {no_exists, _}} ->
552552
%% The table might not exist yet (or is already gone)
553-
%% between the time rabbit_nodes:all_running() runs and
553+
%% between the time rabbit_nodes:list_running() runs and
554554
%% returns a specific node, and
555555
%% mnesia:dirty_match_object() is called for that node's
556556
%% table.

deps/rabbit/src/rabbit_direct.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ list_local() ->
4444
-spec list() -> [pid()].
4545

4646
list() ->
47-
Nodes = rabbit_nodes:all_running(),
47+
Nodes = rabbit_nodes:list_running(),
4848
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT).
4949

5050
%%----------------------------------------------------------------------------

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ stop_local_quorum_queue_followers() ->
341341

342342
-spec primary_replica_transfer_candidate_nodes() -> [node()].
343343
primary_replica_transfer_candidate_nodes() ->
344-
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
344+
filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
345345

346346
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
347347
random_primary_replica_transfer_candidate_node([], _Preferred) ->

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
154154
slaves_to_start_on_failure(Q, DeadGMPids) ->
155155
%% In case Mnesia has not caught up yet, filter out nodes we know
156156
%% to be dead..
157-
ClusterNodes = rabbit_nodes:all_running() --
157+
ClusterNodes = rabbit_nodes:list_running() --
158158
[node(P) || P <- DeadGMPids],
159159
{_, OldNodes, _} = actual_queue_nodes(Q),
160160
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
@@ -321,7 +321,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
321321
%% a long time without being removed.
322322
update_recoverable(SPids, RS) ->
323323
SNodes = [node(SPid) || SPid <- SPids],
324-
RunningNodes = rabbit_nodes:all_running(),
324+
RunningNodes = rabbit_nodes:list_running(),
325325
AddNodes = SNodes -- RS,
326326
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
327327
(RS -- DelNodes) ++ AddNodes.
@@ -375,17 +375,17 @@ promote_slave([SPid | SPids]) ->
375375
-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
376376

377377
initial_queue_node(Q, DefNode) ->
378-
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
378+
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:list_running()),
379379
MNode.
380380

381381
-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
382382
{node(), [node()]}.
383383

384-
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
384+
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:list_running()).
385385
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
386386

387387
%% The third argument exists so we can pull a call to
388-
%% rabbit_nodes:all_running() out of a loop or transaction
388+
%% rabbit_nodes:list_running() out of a loop or transaction
389389
%% or both.
390390
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
391391
Owner = amqqueue:get_exclusive_owner(Q),

deps/rabbit/src/rabbit_mnesia_rename.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ prepare(Node, NodeMapList) ->
100100

101101
%% Check that we are in the cluster, all old nodes are in the
102102
%% cluster, and no new nodes are.
103-
Nodes = rabbit_nodes:all(),
103+
Nodes = rabbit_nodes:list_members(),
104104
case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
105105
lists:member(Node, Nodes ++ ToNodes)} of
106106
{[], [], true} -> ok;
@@ -130,7 +130,7 @@ restore_backup(Backup) ->
130130
-spec maybe_finish() -> ok.
131131

132132
maybe_finish() ->
133-
AllNodes = rabbit_nodes:all(),
133+
AllNodes = rabbit_nodes:list_members(),
134134
maybe_finish(AllNodes).
135135

136136
-spec maybe_finish([node()]) -> 'ok'.
@@ -144,7 +144,7 @@ maybe_finish(AllNodes) ->
144144
finish(FromNode, ToNode, AllNodes) ->
145145
case node() of
146146
ToNode ->
147-
case rabbit_nodes:filter_nodes_running_rabbitmq(AllNodes) of
147+
case rabbit_nodes:filter_running(AllNodes) of
148148
[] -> finish_primary(FromNode, ToNode);
149149
_ -> finish_secondary(FromNode, ToNode, AllNodes)
150150
end;
@@ -257,8 +257,8 @@ update_term(_NodeMap, Term) ->
257257
Term.
258258

259259
rename_in_running_mnesia(FromNode, ToNode) ->
260-
All = rabbit_nodes:all(),
261-
Running = rabbit_nodes:all_running(),
260+
All = rabbit_nodes:list_members(),
261+
Running = rabbit_nodes:list_running(),
262262
case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
263263
{false, true} -> ok;
264264
{true, _} -> exit({old_node_running, FromNode});

deps/rabbit/src/rabbit_networking.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
517517
-spec connections() -> [rabbit_types:connection()].
518518

519519
connections() ->
520-
Nodes = rabbit_nodes:all_running(),
520+
Nodes = rabbit_nodes:list_running(),
521521
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, connections_local, [], ?RPC_TIMEOUT).
522522

523523
-spec local_connections() -> [rabbit_types:connection()].
@@ -540,7 +540,7 @@ unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connection
540540
-spec non_amqp_connections() -> [rabbit_types:connection()].
541541

542542
non_amqp_connections() ->
543-
Nodes = rabbit_nodes:all_running(),
543+
Nodes = rabbit_nodes:list_running(),
544544
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_non_amqp_connections, [], ?RPC_TIMEOUT).
545545

546546
-spec local_non_amqp_connections() -> [rabbit_types:connection()].

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ notify_node_up() ->
163163

164164
notify_joined_cluster() ->
165165
NewMember = node(),
166-
Nodes = rabbit_nodes:all_running() -- [NewMember],
166+
Nodes = rabbit_nodes:list_running() -- [NewMember],
167167
gen_server:abcast(Nodes, ?SERVER,
168168
{joined_cluster, node(), rabbit_db_cluster:node_type()}),
169169

@@ -172,7 +172,7 @@ notify_joined_cluster() ->
172172
-spec notify_left_cluster(node()) -> 'ok'.
173173

174174
notify_left_cluster(Node) ->
175-
Nodes = rabbit_nodes:all_running(),
175+
Nodes = rabbit_nodes:list_running(),
176176
gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
177177
ok.
178178

@@ -409,7 +409,7 @@ handle_call(_Request, _From, State) ->
409409
{noreply, State}.
410410

411411
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
412-
Nodes = rabbit_nodes:all_running() -- [node()],
412+
Nodes = rabbit_nodes:list_running() -- [node()],
413413
gen_server:abcast(Nodes, ?SERVER,
414414
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
415415
%% register other active rabbits with this rabbit
@@ -466,7 +466,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
466466
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
467467
State = #state{guid = MyGUID,
468468
node_guids = GUIDs}) ->
469-
case lists:member(Node, rabbit_nodes:all_running()) andalso
469+
case lists:member(Node, rabbit_nodes:list_members()) andalso
470470
maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
471471
true -> spawn_link( %%[1]
472472
fun () ->
@@ -618,7 +618,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
618618
Node, node(), DownGUID, CheckGUID, MyGUID})
619619
end,
620620
case maps:find(Node, GUIDs) of
621-
{ok, DownGUID} -> Alive = rabbit_nodes:all_running()
621+
{ok, DownGUID} -> Alive = rabbit_nodes:list_members()
622622
-- [node(), Node],
623623
[case maps:find(N, GUIDs) of
624624
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
@@ -817,7 +817,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
817817
%% going away. It's only safe to forget anything about partitions when
818818
%% there are no partitions.
819819
Down = Partitions -- alive_rabbit_nodes(),
820-
NoLongerPartitioned = rabbit_nodes:all_running(),
820+
NoLongerPartitioned = rabbit_nodes:list_running(),
821821
Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
822822
[] -> [];
823823
_ -> Partitions
@@ -898,8 +898,7 @@ disconnect(Node) ->
898898

899899
%%--------------------------------------------------------------------
900900

901-
%% mnesia:system_info(db_nodes) (and hence
902-
%% rabbit_nodes:all_running()) does not return all nodes
901+
%% mnesia:system_info(db_nodes) does not return all nodes
903902
%% when partitioned, just those that we are sharing Mnesia state
904903
%% with. So we have a small set of replacement functions
905904
%% here. "rabbit" in a function's name implies we test if the rabbit
@@ -914,7 +913,7 @@ majority() ->
914913
majority([]).
915914

916915
majority(NodesDown) ->
917-
Nodes = rabbit_nodes:all(),
916+
Nodes = rabbit_nodes:list_members(),
918917
AliveNodes = alive_nodes(Nodes) -- NodesDown,
919918
length(AliveNodes) / length(Nodes) > 0.5.
920919

@@ -927,44 +926,44 @@ in_preferred_partition(PreferredNodes) ->
927926
in_preferred_partition(PreferredNodes, []).
928927

929928
in_preferred_partition(PreferredNodes, NodesDown) ->
930-
Nodes = rabbit_nodes:all(),
929+
Nodes = rabbit_nodes:list_members(),
931930
RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
932931
AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
933932
RealPreferredNodes =:= [] orelse AliveNodes =/= [].
934933

935934
all_nodes_up() ->
936-
Nodes = rabbit_nodes:all(),
935+
Nodes = rabbit_nodes:list_members(),
937936
length(alive_nodes(Nodes)) =:= length(Nodes).
938937

939938
-spec all_rabbit_nodes_up() -> boolean().
940939

941940
all_rabbit_nodes_up() ->
942-
Nodes = rabbit_nodes:all(),
941+
Nodes = rabbit_nodes:list_members(),
943942
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
944943

945-
alive_nodes() -> alive_nodes(rabbit_nodes:all()).
944+
alive_nodes() -> rabbit_nodes:list_reachable().
946945

947946
-spec alive_nodes([node()]) -> [node()].
948947

949-
alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
948+
alive_nodes(Nodes) -> rabbit_nodes:filter_reachable(Nodes).
950949

951-
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_nodes:all()).
950+
alive_rabbit_nodes() -> rabbit_nodes:list_running().
952951

953952
-spec alive_rabbit_nodes([node()]) -> [node()].
954953

955954
alive_rabbit_nodes(Nodes) ->
956-
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
955+
rabbit_nodes:filter_running(Nodes).
957956

958957
%% This one is allowed to connect!
959958

960959
-spec ping_all() -> 'ok'.
961960

962961
ping_all() ->
963-
[net_adm:ping(N) || N <- rabbit_nodes:all()],
962+
[net_adm:ping(N) || N <- rabbit_nodes:list_members()],
964963
ok.
965964

966965
possibly_partitioned_nodes() ->
967-
alive_rabbit_nodes() -- rabbit_nodes:all_running().
966+
rabbit_nodes:list_unreachable().
968967

969968
startup_log([]) ->
970969
rabbit_log:info("Starting rabbit_node_monitor", []);

0 commit comments

Comments
 (0)