diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 89ca718a89a4..0672e234fe6e 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -441,19 +441,28 @@ become_leader0(QName, Name) -> -spec all_replica_states() -> {node(), #{atom() => atom()}}. all_replica_states() -> Rows0 = ets:tab2list(ra_state), - Rows = lists:map(fun - ({K, follower, promotable}) -> - {K, promotable}; - ({K, follower, non_voter}) -> - {K, non_voter}; - ({K, S, _}) -> - %% voter or unknown - {K, S}; - (T) -> - T - end, Rows0), + Rows = lists:filtermap( + fun + (T = {K, _, _}) -> + case whereis(K) of + undefined -> + false; + P when is_pid(P) -> + {true, to_replica_state(T)} + end; + (_T) -> + false + end, Rows0), {node(), maps:from_list(Rows)}. +to_replica_state({K, follower, promotable}) -> + {K, promotable}; +to_replica_state({K, follower, non_voter}) -> + {K, non_voter}; +to_replica_state({K, S, _}) -> + %% voter or unknown + {K, S}. + -spec list_with_minimum_quorum() -> [amqqueue:amqqueue()]. list_with_minimum_quorum() -> Queues = rabbit_amqqueue:list_local_quorum_queues(), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 6d61e522b91e..dd3f2f50ce0d 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -97,7 +97,8 @@ groups() -> force_all_queues_shrink_member_to_current_member, force_vhost_queues_shrink_member_to_current_member, policy_repair, - gh_12635 + gh_12635, + replica_states ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -4355,6 +4356,54 @@ requeue_multiple_false(Config) -> ?assertEqual(#'queue.delete_ok'{message_count = 0}, amqp_channel:call(Ch, #'queue.delete'{queue = QQ})). +replica_states(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + [?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])) + || Q <- [<<"Q1">>, <<"Q2">>, <<"Q3">>]], + + Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + + [Q1_ClusterName, Q2_ClusterName, Q3_ClusterName] = + [begin + {ClusterName, _} = amqqueue:get_pid(Q), + ClusterName + end + || Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue], + + Result1 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []), + ct:pal("all replica states: ~tp", [Result1]), + + lists:map(fun({_Node, ReplicaStates}) -> + ?assert(maps:is_key(Q1_ClusterName, ReplicaStates)), + ?assert(maps:is_key(Q2_ClusterName, ReplicaStates)), + ?assert(maps:is_key(Q3_ClusterName, ReplicaStates)) + end, Result1), + + %% Unregister a few queues (same outcome of 'noproc') + rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q2_ClusterName]), + rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q3_ClusterName]), + + ?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q2_ClusterName])), + ?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q3_ClusterName])), + + Result2 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []), + ct:pal("replica states with a node missing Q1 and Q2: ~tp", [Result2]), + + lists:map(fun({Node, ReplicaStates}) -> + if Node == Server -> + ?assert(maps:is_key(Q1_ClusterName, ReplicaStates)), + ?assertNot(maps:is_key(Q2_ClusterName, ReplicaStates)), + ?assertNot(maps:is_key(Q3_ClusterName, ReplicaStates)); + true -> + ?assert(maps:is_key(Q1_ClusterName, ReplicaStates)), + ?assert(maps:is_key(Q2_ClusterName, ReplicaStates)), + ?assert(maps:is_key(Q3_ClusterName, ReplicaStates)) + end + end, Result2). + %%---------------------------------------------------------------------------- same_elements(L1, L2) diff --git a/deps/rabbit/test/unit_quorum_queue_SUITE.erl b/deps/rabbit/test/unit_quorum_queue_SUITE.erl index 2f4a7e7133b6..4ca2cf70fd33 100644 --- a/deps/rabbit/test/unit_quorum_queue_SUITE.erl +++ b/deps/rabbit/test/unit_quorum_queue_SUITE.erl @@ -7,7 +7,7 @@ all() -> [ - all_replica_states_includes_nonvoters, + all_replica_states_includes_alive_nonvoters, filter_nonvoters, filter_quorum_critical_accounts_nonvoters, ra_machine_conf_delivery_limit @@ -97,27 +97,29 @@ filter_nonvoters(_Config) -> [Q4] = rabbit_quorum_queue:filter_promotable(Qs, Ss), ok. -all_replica_states_includes_nonvoters(_Config) -> +all_replica_states_includes_alive_nonvoters(_Config) -> ets:new(ra_state, [named_table, public, {write_concurrency, true}]), + QPids = start_qprocs(_AliveQs = [q1, q2, q3, q4]), ets:insert(ra_state, [ {q1, leader, voter}, {q2, follower, voter}, {q3, follower, promotable}, {q4, init, unknown}, - %% pre ra-2.7.0 - {q5, leader}, - {q6, follower} + %% queues in ra_state but not alive + {q5, leader, voter}, + {q6, follower, noproc} ]), {_, #{ q1 := leader, q2 := follower, q3 := promotable, - q4 := init, - q5 := leader, - q6 := follower - }} = rabbit_quorum_queue:all_replica_states(), + q4 := init + } = ReplicaStates} = rabbit_quorum_queue:all_replica_states(), + ?assertNot(maps:is_key(q5, ReplicaStates)), + ?assertNot(maps:is_key(q6, ReplicaStates)), true = ets:delete(ra_state), + _ = stop_qprocs(QPids), ok. make_ra_machine_conf(Q0, Arg, Pol, OpPol) -> @@ -128,3 +130,13 @@ make_ra_machine_conf(Q0, Arg, Pol, OpPol) -> {definition, [{<<"delivery-limit">>,OpPol}]}]), rabbit_quorum_queue:ra_machine_config(Q). +start_qprocs(Qs) -> + [begin + QPid = spawn(fun() -> receive done -> ok end end), + erlang:register(Q, QPid), + QPid + end || Q <- Qs]. + +stop_qprocs(Pids) -> + [erlang:send(P, done)|| P <- Pids]. +