Skip to content

Commit 4563228

Browse files
Merge pull request #1819 from rabbitmq/qq-testing
Testing of quorum queues
2 parents 3c42423 + 2ca9c29 commit 4563228

File tree

8 files changed

+344
-57
lines changed

8 files changed

+344
-57
lines changed

src/rabbit_amqqueue.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,9 @@ forget_all_durable(Node) ->
12781278
%% Try to promote a slave while down - it should recover as a
12791279
%% master. We try to take the oldest slave here for best chance of
12801280
%% recovery.
1281+
forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum,
1282+
quorum_nodes = QN}) ->
1283+
forget_node_for_queue(DeadNode, QN, Q);
12811284
forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
12821285
forget_node_for_queue(DeadNode, RS, Q).
12831286

@@ -1291,11 +1294,12 @@ forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
12911294
forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
12921295
forget_node_for_queue(DeadNode, T, Q);
12931296

1294-
forget_node_for_queue(DeadNode, [H|T], Q) ->
1295-
case node_permits_offline_promotion(H) of
1296-
false -> forget_node_for_queue(DeadNode, T, Q);
1297-
true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
1298-
ok = mnesia:write(rabbit_durable_queue, Q1, write)
1297+
forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) ->
1298+
case {node_permits_offline_promotion(H), Type} of
1299+
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
1300+
{true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
1301+
ok = mnesia:write(rabbit_durable_queue, Q1, write);
1302+
{true, quorum} -> ok
12991303
end.
13001304

13011305
node_permits_offline_promotion(Node) ->

src/rabbit_channel.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,6 @@ handle_cast({method, Method, Content, Flow},
562562
flow -> credit_flow:ack(Reader);
563563
noflow -> ok
564564
end,
565-
566565
try handle_method(rabbit_channel_interceptor:intercept_in(
567566
expand_shortcuts(Method, State), Content, IState),
568567
State) of

src/rabbit_fifo.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
query_ra_indexes/1,
4343
query_consumer_count/1,
4444
query_consumers/1,
45+
query_stat/1,
4546
usage/1,
4647

4748
zero/1,
@@ -721,7 +722,10 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
721722
Acc)
722723
end, #{}, WaitingConsumers),
723724
maps:merge(FromConsumers, FromWaitingConsumers).
724-
%% other
725+
726+
query_stat(#state{messages = M,
727+
consumers = Consumers}) ->
728+
{maps:size(M), maps:size(Consumers)}.
725729

726730
-spec usage(atom()) -> float().
727731
usage(Name) when is_atom(Name) ->

src/rabbit_fifo_client.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
untracked_enqueue/2,
3939
purge/1,
4040
cluster_name/1,
41-
update_machine_state/2
41+
update_machine_state/2,
42+
stat/1
4243
]).
4344

4445
-include_lib("ra/include/ra.hrl").
@@ -398,6 +399,13 @@ purge(Node) ->
398399
Err
399400
end.
400401

402+
-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
403+
| {error | timeout, term()}.
404+
stat(Leader) ->
405+
Query = fun (State) -> rabbit_fifo:query_stat(State) end,
406+
{ok, {_, Stat}, _} = ra:local_query(Leader, Query),
407+
Stat.
408+
401409
%% @doc returns the cluster name
402410
-spec cluster_name(state()) -> cluster_name().
403411
cluster_name(#state{cluster_name = ClusterName}) ->

src/rabbit_quorum_queue.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,8 +432,15 @@ infos(QName) ->
432432
info(Q, Items) ->
433433
[{Item, i(Item, Q)} || Item <- Items].
434434

435-
stat(_Q) ->
436-
{ok, 0, 0}. %% TODO length, consumers count
435+
stat(#amqqueue{pid = Leader}) ->
436+
try
437+
{Ready, Consumers} = rabbit_fifo_client:stat(Leader),
438+
{ok, Ready, Consumers}
439+
catch
440+
_:_ ->
441+
%% Leader is not available, cluster might be in minority
442+
{ok, 0, 0}
443+
end.
437444

438445
purge(Node) ->
439446
rabbit_fifo_client:purge(Node).

test/dynamic_qq_SUITE.erl

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
%% This test suite is an adaptation from dynamic_ha_SUITE for quorum queues.
17+
%% Some test cases didn't make sense, but others could be adapted for quorum queue.
18+
%%
19+
%%
20+
-module(dynamic_qq_SUITE).
21+
22+
-include_lib("common_test/include/ct.hrl").
23+
-include_lib("proper/include/proper.hrl").
24+
-include_lib("eunit/include/eunit.hrl").
25+
-include_lib("amqp_client/include/amqp_client.hrl").
26+
27+
-import(quorum_queue_utils, [wait_for_messages_ready/3,
28+
ra_name/1]).
29+
30+
-compile(export_all).
31+
32+
all() ->
33+
[
34+
{group, clustered}
35+
].
36+
37+
groups() ->
38+
[
39+
{clustered, [], [
40+
{cluster_size_2, [], [
41+
vhost_deletion,
42+
force_delete_if_no_consensus,
43+
takeover_on_failure,
44+
takeover_on_shutdown,
45+
quorum_unaffected_after_vhost_failure
46+
]},
47+
{cluster_size_3, [], [
48+
recover_follower_after_standalone_restart
49+
]}
50+
]}
51+
].
52+
53+
%% -------------------------------------------------------------------
54+
%% Testsuite setup/teardown.
55+
%% -------------------------------------------------------------------
56+
57+
init_per_suite(Config) ->
58+
rabbit_ct_helpers:log_environment(),
59+
rabbit_ct_helpers:run_setup_steps(Config).
60+
61+
end_per_suite(Config) ->
62+
rabbit_ct_helpers:run_teardown_steps(Config).
63+
64+
init_per_group(clustered, Config) ->
65+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
66+
init_per_group(cluster_size_2, Config) ->
67+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
68+
init_per_group(cluster_size_3, Config) ->
69+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]).
70+
71+
end_per_group(_, Config) ->
72+
Config.
73+
74+
init_per_testcase(Testcase, Config) ->
75+
rabbit_ct_helpers:testcase_started(Config, Testcase),
76+
ClusterSize = ?config(rmq_nodes_count, Config),
77+
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
78+
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
79+
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
80+
Config1 = rabbit_ct_helpers:set_config(Config, [
81+
{rmq_nodename_suffix, Testcase},
82+
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
83+
{queue_name, Q},
84+
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}
85+
]),
86+
rabbit_ct_helpers:run_steps(Config1,
87+
rabbit_ct_broker_helpers:setup_steps() ++
88+
rabbit_ct_client_helpers:setup_steps()).
89+
90+
end_per_testcase(Testcase, Config) ->
91+
Config1 = rabbit_ct_helpers:run_steps(Config,
92+
rabbit_ct_client_helpers:teardown_steps() ++
93+
rabbit_ct_broker_helpers:teardown_steps()),
94+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
95+
96+
%% -------------------------------------------------------------------
97+
%% Testcases.
98+
%% -------------------------------------------------------------------
99+
%% Vhost deletion needs to successfully tear down queues.
100+
vhost_deletion(Config) ->
101+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
102+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
103+
QName = ?config(queue_name, Config),
104+
Args = ?config(queue_args, Config),
105+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
106+
arguments = Args,
107+
durable = true
108+
}),
109+
ok = rpc:call(Node, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
110+
?assertMatch([],
111+
rabbit_ct_broker_helpers:rabbitmqctl_list(
112+
Config, 0, ["list_queues", "name"])),
113+
ok.
114+
115+
force_delete_if_no_consensus(Config) ->
116+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
117+
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
118+
QName = ?config(queue_name, Config),
119+
Args = ?config(queue_args, Config),
120+
amqp_channel:call(ACh, #'queue.declare'{queue = QName,
121+
arguments = Args,
122+
durable = true
123+
}),
124+
rabbit_ct_client_helpers:publish(ACh, QName, 10),
125+
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
126+
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
127+
128+
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
129+
?assertMatch(
130+
#'queue.declare_ok'{},
131+
amqp_channel:call(
132+
BCh, #'queue.declare'{queue = QName,
133+
arguments = Args,
134+
durable = true,
135+
passive = true})),
136+
%% TODO implement a force delete
137+
BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
138+
?assertExit({{shutdown,
139+
{connection_closing, {server_initiated_close, 541, _}}}, _},
140+
amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
141+
ok.
142+
143+
takeover_on_failure(Config) ->
144+
takeover_on(Config, kill_node).
145+
146+
takeover_on_shutdown(Config) ->
147+
takeover_on(Config, stop_node).
148+
149+
takeover_on(Config, Fun) ->
150+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
151+
152+
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
153+
QName = ?config(queue_name, Config),
154+
Args = ?config(queue_args, Config),
155+
amqp_channel:call(ACh, #'queue.declare'{queue = QName,
156+
arguments = Args,
157+
durable = true
158+
}),
159+
rabbit_ct_client_helpers:publish(ACh, QName, 10),
160+
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
161+
162+
ok = rabbit_ct_broker_helpers:Fun(Config, A),
163+
164+
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
165+
#'queue.declare_ok'{message_count = 0} =
166+
amqp_channel:call(
167+
BCh, #'queue.declare'{queue = QName,
168+
arguments = Args,
169+
durable = true}),
170+
ok = rabbit_ct_broker_helpers:start_node(Config, A),
171+
ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
172+
#'queue.declare_ok'{message_count = 10} =
173+
amqp_channel:call(
174+
ACh2, #'queue.declare'{queue = QName,
175+
arguments = Args,
176+
durable = true}),
177+
ok.
178+
179+
quorum_unaffected_after_vhost_failure(Config) ->
180+
[A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
181+
Servers = lists:sort(Servers0),
182+
183+
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
184+
QName = ?config(queue_name, Config),
185+
Args = ?config(queue_args, Config),
186+
amqp_channel:call(ACh, #'queue.declare'{queue = QName,
187+
arguments = Args,
188+
durable = true
189+
}),
190+
timer:sleep(300),
191+
192+
Info0 = rpc:call(A, rabbit_quorum_queue, infos,
193+
[rabbit_misc:r(<<"/">>, queue, QName)]),
194+
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info0, []))),
195+
196+
%% Crash vhost on both nodes
197+
{ok, SupA} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
198+
exit(SupA, foo),
199+
{ok, SupB} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
200+
exit(SupB, foo),
201+
202+
Info = rpc:call(A, rabbit_quorum_queue, infos,
203+
[rabbit_misc:r(<<"/">>, queue, QName)]),
204+
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
205+
206+
recover_follower_after_standalone_restart(Config) ->
207+
%% Tests that followers can be brought up standalone after forgetting the rest
208+
%% of the cluster. Consensus won't be reached as there is only one node in the
209+
%% new cluster.
210+
Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
211+
Ch = rabbit_ct_client_helpers:open_channel(Config, A),
212+
213+
QName = ?config(queue_name, Config),
214+
Args = ?config(queue_args, Config),
215+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
216+
arguments = Args,
217+
durable = true
218+
}),
219+
220+
rabbit_ct_client_helpers:publish(Ch, QName, 15),
221+
rabbit_ct_client_helpers:close_channel(Ch),
222+
223+
Name = ra_name(QName),
224+
wait_for_messages_ready(Servers, Name, 15),
225+
226+
rabbit_ct_broker_helpers:stop_node(Config, C),
227+
rabbit_ct_broker_helpers:stop_node(Config, B),
228+
rabbit_ct_broker_helpers:stop_node(Config, A),
229+
230+
%% Restart one follower
231+
forget_cluster_node(Config, B, C),
232+
forget_cluster_node(Config, B, A),
233+
234+
ok = rabbit_ct_broker_helpers:start_node(Config, B),
235+
wait_for_messages_ready([B], Name, 15),
236+
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
237+
238+
%% Restart the other
239+
forget_cluster_node(Config, C, B),
240+
forget_cluster_node(Config, C, A),
241+
242+
ok = rabbit_ct_broker_helpers:start_node(Config, C),
243+
wait_for_messages_ready([C], Name, 15),
244+
ok = rabbit_ct_broker_helpers:stop_node(Config, C),
245+
246+
ok.
247+
248+
%%----------------------------------------------------------------------------
249+
forget_cluster_node(Config, Node, NodeToRemove) ->
250+
rabbit_ct_broker_helpers:rabbitmqctl(
251+
Config, Node, ["forget_cluster_node", "--offline", NodeToRemove]).

0 commit comments

Comments
 (0)