Skip to content

Commit 18156b3

Browse files
committed
rabbit_nodes: Add list functions to clarify which nodes we are interested in
So far, we had the following functions to list nodes in a RabbitMQ cluster: * `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster; the argument was used to select members (all members or only those running Mnesia and participating in the cluster) * `rabbit_nodes:all/0` to get all members of the Mnesia cluster * `rabbit_nodes:all_running/0` to get all members who currently run Mnesia Basically: * `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)` * `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)` We also have: * `rabbit_node_monitor:alive_nodes/1` which filters the given list of nodes to only select those currently running Mnesia * `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given list of nodes to only select those currently running RabbitMQ Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the `rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)` or `rabbit_nodes:all_running/0` is often used as a close approximation of "all cluster members running RabbitMQ". This list might be incorrect in times where a node is joining the clustered or is being worked on (i.e. Mnesia is running but not RabbitMQ). With Khepri, there won't be the same possible approximation because we will try to keep Khepri/Ra running even if RabbitMQ is stopped to expand/shrink the cluster. So in order to clarify what we want when we query a list of nodes, this patch introduces the following functions: * `rabbit_nodes:list_members/0` to get all cluster members, regardless of their state * `rabbit_nodes:list_reachable/0` to get all cluster members we can reach using Erlang distribution, regardless of the state of RabbitMQ * `rabbit_nodes:list_running/0` to get all cluster members who run RabbitMQ, regardless of the maintenance state * `rabbit_nodes:list_serving/0` to get all cluster members who run RabbitMQ and are accepting clients In addition to the list functions, there are the corresponding `rabbit_nodes:filter_*(Nodes)` filtering functions. The code is modified to use these new functions. One possible significant change is that the new list functions will perform RPC calls to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
1 parent ad24db2 commit 18156b3

31 files changed

+432
-100
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
441441
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
442442
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
443443
[Type, VhostSpec, QueueSpec]),
444-
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
444+
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
445445
NumRunning = length(Running),
446446
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
447447
filter_per_type(Type, Q),
@@ -1121,7 +1121,7 @@ list() ->
11211121

11221122
do_list() ->
11231123
All = mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()),
1124-
NodesRunning = rabbit_nodes:all_running(),
1124+
NodesRunning = rabbit_nodes:list_running(),
11251125
lists:filter(fun (Q) ->
11261126
Pid = amqqueue:get_pid(Q),
11271127
St = amqqueue:get_state(Q),
@@ -1307,7 +1307,7 @@ is_in_virtual_host(Q, VHostName) ->
13071307
-spec list(vhost:name()) -> [amqqueue:amqqueue()].
13081308
list(VHostPath) ->
13091309
All = list(VHostPath, rabbit_queue),
1310-
NodesRunning = rabbit_nodes:all_running(),
1310+
NodesRunning = rabbit_nodes:list_running(),
13111311
lists:filter(fun (Q) ->
13121312
Pid = amqqueue:get_pid(Q),
13131313
St = amqqueue:get_state(Q),
@@ -1368,7 +1368,7 @@ list_down(VHostPath) ->
13681368
true ->
13691369
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
13701370
Durable = list(VHostPath, rabbit_durable_queue),
1371-
NodesRunning = rabbit_nodes:all_running(),
1371+
NodesRunning = rabbit_nodes:list_running(),
13721372
lists:filter(fun (Q) ->
13731373
N = amqqueue:get_name(Q),
13741374
Pid = amqqueue:get_pid(Q),
@@ -1488,7 +1488,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
14881488
rabbit_control_misc:await_emitters_termination(Pids).
14891489

14901490
collect_info_all(VHostPath, Items) ->
1491-
Nodes = rabbit_nodes:all_running(),
1491+
Nodes = rabbit_nodes:list_running(),
14921492
Ref = make_ref(),
14931493
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
14941494
rabbit_control_misc:await_emitters_termination(Pids),
@@ -1904,10 +1904,8 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
19041904
node_permits_offline_promotion(Node) ->
19051905
case node() of
19061906
Node -> not rabbit:is_running(); %% [1]
1907-
_ -> All = rabbit_nodes:all(),
1908-
Running = rabbit_nodes:all_running(),
1909-
lists:member(Node, All) andalso
1910-
not lists:member(Node, Running) %% [2]
1907+
_ -> NotRunning = rabbit_nodes:list_not_running(),
1908+
lists:member(Node, NotRunning) %% [2]
19111909
end.
19121910
%% [1] In this case if we are a real running node (i.e. rabbitmqctl
19131911
%% has RPCed into us) then we cannot allow promotion. If on the other

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ send_drained(Pid, CTagCredit) ->
369369
-spec list() -> [pid()].
370370

371371
list() ->
372-
Nodes = rabbit_nodes:all_running(),
372+
Nodes = rabbit_nodes:list_running(),
373373
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
374374

375375
-spec list_local() -> [pid()].

deps/rabbit/src/rabbit_channel_tracking.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ list() ->
264264
lists:foldl(
265265
fun (Node, Acc) ->
266266
Acc ++ list_on_node(Node)
267-
end, [], rabbit_nodes:all_running()).
267+
end, [], rabbit_nodes:list_running()).
268268

269269
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
270270

@@ -309,7 +309,7 @@ list_on_node_mnesia(Node) ->
309309
#tracked_channel{_ = '_'})
310310
catch exit:{aborted, {no_exists, _}} ->
311311
%% The table might not exist yet (or is already gone)
312-
%% between the time rabbit_nodes:all_running() runs and
312+
%% between the time rabbit_nodes:list_running() runs and
313313
%% returns a specific node, and
314314
%% mnesia:dirty_match_object() is called for that node's
315315
%% table.

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ get_all_tracked_connection_table_names_for_node(Node) ->
435435
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
436436

437437
lookup(Name) ->
438-
Nodes = rabbit_nodes:all_running(),
438+
Nodes = rabbit_nodes:list_running(),
439439
lookup(Name, Nodes).
440440

441441
lookup(_, []) ->
@@ -470,15 +470,15 @@ list() ->
470470
lists:foldl(
471471
fun (Node, Acc) ->
472472
Acc ++ list_on_node(Node)
473-
end, [], rabbit_nodes:all_running()).
473+
end, [], rabbit_nodes:list_running()).
474474

475475
-spec count() -> non_neg_integer().
476476

477477
count() ->
478478
lists:foldl(
479479
fun (Node, Acc) ->
480480
count_on_node(Node) + Acc
481-
end, 0, rabbit_nodes:all_running()).
481+
end, 0, rabbit_nodes:list_running()).
482482

483483
count_on_node(Node) ->
484484
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
@@ -551,7 +551,7 @@ list_on_node_mnesia(Node) ->
551551
#tracked_connection{_ = '_'})
552552
catch exit:{aborted, {no_exists, _}} ->
553553
%% The table might not exist yet (or is already gone)
554-
%% between the time rabbit_nodes:all_running() runs and
554+
%% between the time rabbit_nodes:list_running() runs and
555555
%% returns a specific node, and
556556
%% mnesia:dirty_match_object() is called for that node's
557557
%% table.

deps/rabbit/src/rabbit_direct.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ list_local() ->
4444
-spec list() -> [pid()].
4545

4646
list() ->
47-
Nodes = rabbit_nodes:all_running(),
47+
Nodes = rabbit_nodes:list_running(),
4848
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT).
4949

5050
%%----------------------------------------------------------------------------

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ stop_local_quorum_queue_followers() ->
341341

342342
-spec primary_replica_transfer_candidate_nodes() -> [node()].
343343
primary_replica_transfer_candidate_nodes() ->
344-
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
344+
filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
345345

346346
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
347347
random_primary_replica_transfer_candidate_node([], _Preferred) ->

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
154154
slaves_to_start_on_failure(Q, DeadGMPids) ->
155155
%% In case Mnesia has not caught up yet, filter out nodes we know
156156
%% to be dead..
157-
ClusterNodes = rabbit_nodes:all_running() --
157+
ClusterNodes = rabbit_nodes:list_running() --
158158
[node(P) || P <- DeadGMPids],
159159
{_, OldNodes, _} = actual_queue_nodes(Q),
160160
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
@@ -321,7 +321,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
321321
%% a long time without being removed.
322322
update_recoverable(SPids, RS) ->
323323
SNodes = [node(SPid) || SPid <- SPids],
324-
RunningNodes = rabbit_nodes:all_running(),
324+
RunningNodes = rabbit_nodes:list_running(),
325325
AddNodes = SNodes -- RS,
326326
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
327327
(RS -- DelNodes) ++ AddNodes.
@@ -375,17 +375,17 @@ promote_slave([SPid | SPids]) ->
375375
-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
376376

377377
initial_queue_node(Q, DefNode) ->
378-
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
378+
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:list_running()),
379379
MNode.
380380

381381
-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
382382
{node(), [node()]}.
383383

384-
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
384+
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:list_running()).
385385
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
386386

387387
%% The third argument exists so we can pull a call to
388-
%% rabbit_nodes:all_running() out of a loop or transaction
388+
%% rabbit_nodes:list_running() out of a loop or transaction
389389
%% or both.
390390
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
391391
Owner = amqqueue:get_exclusive_owner(Q),

deps/rabbit/src/rabbit_mnesia_rename.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ prepare(Node, NodeMapList) ->
100100

101101
%% Check that we are in the cluster, all old nodes are in the
102102
%% cluster, and no new nodes are.
103-
Nodes = rabbit_nodes:all(),
103+
Nodes = rabbit_nodes:list_members(),
104104
case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
105105
lists:member(Node, Nodes ++ ToNodes)} of
106106
{[], [], true} -> ok;
@@ -130,7 +130,7 @@ restore_backup(Backup) ->
130130
-spec maybe_finish() -> ok.
131131

132132
maybe_finish() ->
133-
AllNodes = rabbit_nodes:all(),
133+
AllNodes = rabbit_nodes:list_members(),
134134
maybe_finish(AllNodes).
135135

136136
-spec maybe_finish([node()]) -> 'ok'.
@@ -144,7 +144,7 @@ maybe_finish(AllNodes) ->
144144
finish(FromNode, ToNode, AllNodes) ->
145145
case node() of
146146
ToNode ->
147-
case rabbit_nodes:filter_nodes_running_rabbitmq(AllNodes) of
147+
case rabbit_nodes:filter_running(AllNodes) of
148148
[] -> finish_primary(FromNode, ToNode);
149149
_ -> finish_secondary(FromNode, ToNode, AllNodes)
150150
end;
@@ -257,8 +257,8 @@ update_term(_NodeMap, Term) ->
257257
Term.
258258

259259
rename_in_running_mnesia(FromNode, ToNode) ->
260-
All = rabbit_nodes:all(),
261-
Running = rabbit_nodes:all_running(),
260+
All = rabbit_nodes:list_members(),
261+
Running = rabbit_nodes:list_running(),
262262
case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
263263
{false, true} -> ok;
264264
{true, _} -> exit({old_node_running, FromNode});

deps/rabbit/src/rabbit_networking.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ maybe_get_epmd_port(Name, Host) ->
463463
-spec active_listeners() -> [rabbit_types:listener()].
464464

465465
active_listeners() ->
466-
Nodes = rabbit_mnesia:cluster_nodes(running),
466+
Nodes = rabbit_nodes:list_running(),
467467
lists:append([node_listeners(Node) || Node <- Nodes]).
468468

469469
-spec node_listeners(node()) -> [rabbit_types:listener()].
@@ -529,7 +529,7 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
529529
-spec connections() -> [rabbit_types:connection()].
530530

531531
connections() ->
532-
Nodes = rabbit_nodes:all_running(),
532+
Nodes = rabbit_nodes:list_running(),
533533
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, connections_local, [], ?RPC_TIMEOUT).
534534

535535
-spec local_connections() -> [rabbit_types:connection()].
@@ -552,7 +552,7 @@ unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connection
552552
-spec non_amqp_connections() -> [rabbit_types:connection()].
553553

554554
non_amqp_connections() ->
555-
Nodes = rabbit_nodes:all_running(),
555+
Nodes = rabbit_nodes:list_running(),
556556
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_non_amqp_connections, [], ?RPC_TIMEOUT).
557557

558558
-spec local_non_amqp_connections() -> [rabbit_types:connection()].

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ notify_node_up() ->
163163

164164
notify_joined_cluster() ->
165165
NewMember = node(),
166-
Nodes = rabbit_nodes:all_running() -- [NewMember],
166+
Nodes = rabbit_nodes:list_running() -- [NewMember],
167167
gen_server:abcast(Nodes, ?SERVER,
168168
{joined_cluster, node(), rabbit_mnesia:node_type()}),
169169

@@ -172,7 +172,7 @@ notify_joined_cluster() ->
172172
-spec notify_left_cluster(node()) -> 'ok'.
173173

174174
notify_left_cluster(Node) ->
175-
Nodes = rabbit_nodes:all_running(),
175+
Nodes = rabbit_nodes:list_running(),
176176
gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
177177
ok.
178178

@@ -409,7 +409,7 @@ handle_call(_Request, _From, State) ->
409409
{noreply, State}.
410410

411411
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
412-
Nodes = rabbit_nodes:all_running() -- [node()],
412+
Nodes = rabbit_nodes:list_running() -- [node()],
413413
gen_server:abcast(Nodes, ?SERVER,
414414
{node_up, node(), rabbit_mnesia:node_type(), GUID}),
415415
%% register other active rabbits with this rabbit
@@ -466,7 +466,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
466466
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
467467
State = #state{guid = MyGUID,
468468
node_guids = GUIDs}) ->
469-
case lists:member(Node, rabbit_nodes:all_running()) andalso
469+
case lists:member(Node, rabbit_nodes:list_members()) andalso
470470
maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
471471
true -> spawn_link( %%[1]
472472
fun () ->
@@ -619,7 +619,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
619619
Node, node(), DownGUID, CheckGUID, MyGUID})
620620
end,
621621
_ = case maps:find(Node, GUIDs) of
622-
{ok, DownGUID} -> Alive = rabbit_nodes:all_running()
622+
{ok, DownGUID} -> Alive = rabbit_nodes:list_members()
623623
-- [node(), Node],
624624
[case maps:find(N, GUIDs) of
625625
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
@@ -818,7 +818,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
818818
%% going away. It's only safe to forget anything about partitions when
819819
%% there are no partitions.
820820
Down = Partitions -- alive_rabbit_nodes(),
821-
NoLongerPartitioned = rabbit_nodes:all_running(),
821+
NoLongerPartitioned = rabbit_nodes:list_running(),
822822
Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
823823
[] -> [];
824824
_ -> Partitions
@@ -899,8 +899,7 @@ disconnect(Node) ->
899899

900900
%%--------------------------------------------------------------------
901901

902-
%% mnesia:system_info(db_nodes) (and hence
903-
%% rabbit_nodes:all_running()) does not return all nodes
902+
%% mnesia:system_info(db_nodes) does not return all nodes
904903
%% when partitioned, just those that we are sharing Mnesia state
905904
%% with. So we have a small set of replacement functions
906905
%% here. "rabbit" in a function's name implies we test if the rabbit
@@ -915,7 +914,7 @@ majority() ->
915914
majority([]).
916915

917916
majority(NodesDown) ->
918-
Nodes = rabbit_nodes:all(),
917+
Nodes = rabbit_nodes:list_members(),
919918
AliveNodes = alive_nodes(Nodes) -- NodesDown,
920919
length(AliveNodes) / length(Nodes) > 0.5.
921920

@@ -928,44 +927,44 @@ in_preferred_partition(PreferredNodes) ->
928927
in_preferred_partition(PreferredNodes, []).
929928

930929
in_preferred_partition(PreferredNodes, NodesDown) ->
931-
Nodes = rabbit_nodes:all(),
930+
Nodes = rabbit_nodes:list_members(),
932931
RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
933932
AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
934933
RealPreferredNodes =:= [] orelse AliveNodes =/= [].
935934

936935
all_nodes_up() ->
937-
Nodes = rabbit_nodes:all(),
936+
Nodes = rabbit_nodes:list_members(),
938937
length(alive_nodes(Nodes)) =:= length(Nodes).
939938

940939
-spec all_rabbit_nodes_up() -> boolean().
941940

942941
all_rabbit_nodes_up() ->
943-
Nodes = rabbit_nodes:all(),
942+
Nodes = rabbit_nodes:list_members(),
944943
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
945944

946-
alive_nodes() -> alive_nodes(rabbit_nodes:all()).
945+
alive_nodes() -> rabbit_nodes:list_reachable().
947946

948947
-spec alive_nodes([node()]) -> [node()].
949948

950-
alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
949+
alive_nodes(Nodes) -> rabbit_nodes:filter_reachable(Nodes).
951950

952-
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_nodes:all()).
951+
alive_rabbit_nodes() -> rabbit_nodes:list_running().
953952

954953
-spec alive_rabbit_nodes([node()]) -> [node()].
955954

956955
alive_rabbit_nodes(Nodes) ->
957-
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
956+
rabbit_nodes:filter_running(Nodes).
958957

959958
%% This one is allowed to connect!
960959

961960
-spec ping_all() -> 'ok'.
962961

963962
ping_all() ->
964-
[net_adm:ping(N) || N <- rabbit_nodes:all()],
963+
[net_adm:ping(N) || N <- rabbit_nodes:list_members()],
965964
ok.
966965

967966
possibly_partitioned_nodes() ->
968-
alive_rabbit_nodes() -- rabbit_nodes:all_running().
967+
rabbit_nodes:list_unreachable().
969968

970969
startup_log([]) ->
971970
rabbit_log:info("Starting rabbit_node_monitor", []);

0 commit comments

Comments
 (0)