Skip to content

Commit b062641

Browse files
committed
Add shrink_all/1 function
That takes a node and removes all quorum queue members for this node and returns an list of results for each queue. [#162782789]
1 parent 375f743 commit b062641

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

src/rabbit_quorum_queue.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
-export([requeue/3]).
3838
-export([policy_changed/2]).
3939
-export([cleanup_data_dir/0]).
40+
-export([shrink_all/1]).
4041

4142
%%-include_lib("rabbit_common/include/rabbit.hrl").
4243
-include_lib("rabbit.hrl").
@@ -725,6 +726,24 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
725726
E
726727
end.
727728

729+
shrink_all(Node) ->
730+
[begin
731+
QName = amqqueue:get_name(Q),
732+
rabbit_log:info("~s: Removing member ~w",
733+
[rabbit_misc:rs(QName), Node]),
734+
case delete_member(Q, Node) of
735+
ok ->
736+
{ok, QName};
737+
Err ->
738+
rabbit_log:warning("~s: Failed to remove member ~w",
739+
[rabbit_misc:rs(QName), Node]),
740+
{error, QName, Err}
741+
end
742+
end || Q <- rabbit_amqqueue:list(),
743+
amqqueue:get_type(Q) == quorum,
744+
lists:member(Node, amqqueue:get_quorum_nodes(Q))].
745+
746+
728747
%%----------------------------------------------------------------------------
729748
dlx_mfa(Q) ->
730749
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,

test/quorum_queue_SUITE.erl

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ groups() ->
6868
delete_declare,
6969
metrics_cleanup_on_leadership_takeover,
7070
metrics_cleanup_on_leader_crash,
71-
consume_in_minority
71+
consume_in_minority,
72+
shrink_all
7273
]},
7374
{cluster_size_5, [], [start_queue,
7475
start_queue_concurrent,
@@ -189,7 +190,8 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
189190
[{rmq_nodes_count, 3},
190191
{rmq_nodename_suffix, Testcase},
191192
{tcp_ports_base},
192-
{queue_name, Q}
193+
{queue_name, Q},
194+
{alt_queue_name, <<Q/binary, "_alt">>}
193195
]),
194196
Config3 = rabbit_ct_helpers:run_steps(
195197
Config2,
@@ -209,7 +211,8 @@ init_per_testcase(Testcase, Config) ->
209211
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
210212
Q = rabbit_data_coercion:to_binary(Testcase),
211213
Config2 = rabbit_ct_helpers:set_config(Config1,
212-
[{queue_name, Q}
214+
[{queue_name, Q},
215+
{alt_queue_name, <<Q/binary, "_alt">>}
213216
]),
214217
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
215218

@@ -621,7 +624,29 @@ consume_in_minority(Config) ->
621624

622625
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
623626
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
624-
no_ack = false})).
627+
no_ack = false})),
628+
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
629+
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
630+
ok.
631+
632+
shrink_all(Config) ->
633+
[Server0, _Server1, Server2] =
634+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
635+
636+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
637+
QQ = ?config(queue_name, Config),
638+
AQ = ?config(alt_queue_name, Config),
639+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
640+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
641+
?assertEqual({'queue.declare_ok', AQ, 0, 0},
642+
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
643+
timer:sleep(500),
644+
Result = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server2]),
645+
ct:pal("shring all result ~p", [Result]),
646+
?assertMatch([{ok, _}, {ok, _}], Result),
647+
ok.
648+
649+
625650

626651
subscribe_should_fail_when_global_qos_true(Config) ->
627652
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)