Skip to content

Commit 32e5d35

Browse files
Merge pull request #1867 from rabbitmq/publisher-confirms-testing
Publisher confirms test suite for all queue types
2 parents 4707e09 + a938f41 commit 32e5d35

File tree

3 files changed

+381
-103
lines changed

3 files changed

+381
-103
lines changed
Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License at
4+
%% http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
%% License for the specific language governing rights and limitations
9+
%% under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
%%
17+
%% For the full spec see: http://www.rabbitmq.com/confirms.html
18+
%%
19+
-module(publisher_confirms_parallel_SUITE).
20+
21+
-include_lib("common_test/include/ct.hrl").
22+
-include_lib("kernel/include/file.hrl").
23+
-include_lib("amqp_client/include/amqp_client.hrl").
24+
-include_lib("eunit/include/eunit.hrl").
25+
26+
-compile(export_all).
27+
28+
-define(TIMEOUT, 60000).
29+
30+
-import(quorum_queue_utils, [wait_for_messages/2]).
31+
32+
all() ->
33+
[
34+
{group, publisher_confirm_tests}
35+
].
36+
37+
groups() ->
38+
PublisherConfirmTests = [publisher_confirms,
39+
publisher_confirms_with_deleted_queue,
40+
confirm_select_ok,
41+
confirm_nowait,
42+
confirm_ack,
43+
confirm_acks,
44+
confirm_mandatory_unroutable,
45+
confirm_unroutable_message],
46+
[
47+
{publisher_confirm_tests, [],
48+
[
49+
{classic_queue, [parallel], PublisherConfirmTests ++ [confirm_nack]},
50+
{mirrored_queue, [parallel], PublisherConfirmTests ++ [confirm_nack]},
51+
{quorum_queue, [parallel], PublisherConfirmTests ++ [confirm_minority]}
52+
]}
53+
].
54+
55+
suite() ->
56+
[
57+
{timetrap, {minutes, 3}}
58+
].
59+
60+
%% -------------------------------------------------------------------
61+
%% Testsuite setup/teardown.
62+
%% -------------------------------------------------------------------
63+
64+
init_per_suite(Config) ->
65+
rabbit_ct_helpers:log_environment(),
66+
rabbit_ct_helpers:run_setup_steps(Config).
67+
68+
end_per_suite(Config) ->
69+
rabbit_ct_helpers:run_teardown_steps(Config).
70+
71+
init_per_group(classic_queue, Config) ->
72+
rabbit_ct_helpers:set_config(
73+
Config,
74+
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
75+
{queue_durable, true}]);
76+
init_per_group(quorum_queue, Config) ->
77+
rabbit_ct_helpers:set_config(
78+
Config,
79+
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
80+
{queue_durable, true}]);
81+
init_per_group(mirrored_queue, Config) ->
82+
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
83+
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
84+
Config1 = rabbit_ct_helpers:set_config(
85+
Config, [{is_mirrored, true},
86+
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
87+
{queue_durable, true}]),
88+
rabbit_ct_helpers:run_steps(Config1, []);
89+
init_per_group(Group, Config) ->
90+
case lists:member({group, Group}, all()) of
91+
true ->
92+
ClusterSize = 2,
93+
Config1 = rabbit_ct_helpers:set_config(Config, [
94+
{rmq_nodename_suffix, Group},
95+
{rmq_nodes_count, ClusterSize}
96+
]),
97+
rabbit_ct_helpers:run_steps(Config1,
98+
rabbit_ct_broker_helpers:setup_steps() ++
99+
rabbit_ct_client_helpers:setup_steps());
100+
false ->
101+
rabbit_ct_helpers:run_steps(Config, [])
102+
end.
103+
104+
end_per_group(Group, Config) ->
105+
case lists:member({group, Group}, all()) of
106+
true ->
107+
rabbit_ct_helpers:run_steps(Config,
108+
rabbit_ct_client_helpers:teardown_steps() ++
109+
rabbit_ct_broker_helpers:teardown_steps());
110+
false ->
111+
Config
112+
end.
113+
114+
init_per_testcase(Testcase, Config) ->
115+
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
116+
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
117+
Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
118+
Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
119+
{queue_name_2, Q2}]),
120+
rabbit_ct_helpers:testcase_started(Config1, Testcase).
121+
122+
end_per_testcase(Testcase, Config) ->
123+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
124+
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
125+
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
126+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
127+
128+
%% To enable confirms, a client sends the confirm.select method
129+
publisher_confirms(Config) ->
130+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
131+
QName = ?config(queue_name, Config),
132+
declare_queue(Ch, Config, QName),
133+
amqp_channel:call(Ch, #'confirm.select'{}),
134+
amqp_channel:register_confirm_handler(Ch, self()),
135+
publish(Ch, QName, [<<"msg1">>]),
136+
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
137+
amqp_channel:wait_for_confirms(Ch, 5000),
138+
amqp_channel:unregister_confirm_handler(Ch),
139+
ok.
140+
141+
publisher_confirms_with_deleted_queue(Config) ->
142+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
143+
QName = ?config(queue_name, Config),
144+
declare_queue(Ch, Config, QName),
145+
amqp_channel:call(Ch, #'confirm.select'{}),
146+
amqp_channel:register_confirm_handler(Ch, self()),
147+
publish(Ch, QName, [<<"msg1">>]),
148+
amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
149+
amqp_channel:wait_for_confirms_or_die(Ch, 5000),
150+
amqp_channel:unregister_confirm_handler(Ch).
151+
152+
%% Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok
153+
confirm_select_ok(Config) ->
154+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
155+
QName = ?config(queue_name, Config),
156+
declare_queue(Ch, Config, QName),
157+
?assertEqual(#'confirm.select_ok'{}, amqp_channel:call(Ch, #'confirm.select'{nowait = false})).
158+
159+
confirm_nowait(Config) ->
160+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
161+
QName = ?config(queue_name, Config),
162+
declare_queue(Ch, Config, QName),
163+
?assertEqual(ok, amqp_channel:call(Ch, #'confirm.select'{nowait = true})).
164+
165+
%% The broker then confirms messages as it handles them by sending a basic.ack on the same channel.
166+
%% The delivery-tag field contains the sequence number of the confirmed message.
167+
confirm_ack(Config) ->
168+
%% Ensure we receive an ack and not a nack
169+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
170+
QName = ?config(queue_name, Config),
171+
declare_queue(Ch, Config, QName),
172+
amqp_channel:call(Ch, #'confirm.select'{}),
173+
amqp_channel:register_confirm_handler(Ch, self()),
174+
publish(Ch, QName, [<<"msg1">>]),
175+
receive
176+
#'basic.ack'{delivery_tag = 1} ->
177+
ok
178+
after 5000 ->
179+
throw(missing_ack)
180+
end.
181+
182+
%% The broker may also set the multiple field in basic.ack to indicate that all messages up to
183+
%% and including the one with the sequence number have been handled.
184+
confirm_acks(Config) ->
185+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
186+
QName = ?config(queue_name, Config),
187+
declare_queue(Ch, Config, QName),
188+
amqp_channel:call(Ch, #'confirm.select'{}),
189+
amqp_channel:register_confirm_handler(Ch, self()),
190+
publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>, <<"msg4">>]),
191+
receive_many(lists:seq(1, 4)).
192+
193+
%% For unroutable messages, the broker will issue a confirm once the exchange verifies a message
194+
%% won't route to any queue (returns an empty list of queues).
195+
%% If the message is also published as mandatory, the basic.return is sent to the client before
196+
%% basic.ack.
197+
confirm_mandatory_unroutable(Config) ->
198+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
199+
QName = ?config(queue_name, Config),
200+
amqp_channel:call(Ch, #'confirm.select'{}),
201+
amqp_channel:register_confirm_handler(Ch, self()),
202+
amqp_channel:register_return_handler(Ch, self()),
203+
ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName,
204+
mandatory = true}, #amqp_msg{payload = <<"msg1">>}),
205+
receive
206+
{#'basic.return'{}, _} ->
207+
ok
208+
after 5000 ->
209+
throw(missing_return)
210+
end,
211+
receive
212+
#'basic.ack'{delivery_tag = 1} ->
213+
ok
214+
after 5000 ->
215+
throw(missing_ack)
216+
end.
217+
218+
confirm_unroutable_message(Config) ->
219+
%% Ensure we receive a nack for an unroutable message
220+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
221+
QName = ?config(queue_name, Config),
222+
amqp_channel:call(Ch, #'confirm.select'{}),
223+
amqp_channel:register_confirm_handler(Ch, self()),
224+
publish(Ch, QName, [<<"msg1">>]),
225+
receive
226+
{#'basic.return'{}, _} ->
227+
throw(unexpected_basic_return);
228+
#'basic.ack'{delivery_tag = 1} ->
229+
ok
230+
after 5000 ->
231+
throw(missing_ack)
232+
end.
233+
234+
%% In exceptional cases when the broker is unable to handle messages successfully,
235+
%% instead of a basic.ack, the broker will send a basic.nack.
236+
%% basic.nack will only be delivered if an internal error occurs in the Erlang process
237+
%% responsible for a queue.
238+
%% This test crashes the queue before it has time to answer, but it only works for classic
239+
%% queues. On quorum queues the followers will take over and rabbit_fifo_client will resend
240+
%% any pending messages.
241+
confirm_nack(Config) ->
242+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
243+
?MODULE, confirm_nack1, [Config]).
244+
245+
confirm_nack1(Config) ->
246+
{_Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(),
247+
ok = rabbit_channel:do(Ch, #'channel.open'{}),
248+
receive #'channel.open_ok'{} -> ok
249+
after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
250+
end,
251+
Args = ?config(queue_args, Config),
252+
Durable = ?config(queue_durable, Config),
253+
QName1 = ?config(queue_name, Config),
254+
QName2 = ?config(queue_name_2, Config),
255+
DeclareBindDurableQueue =
256+
fun(QName) ->
257+
rabbit_channel:do(Ch, #'queue.declare'{durable = Durable,
258+
queue = QName,
259+
arguments = Args}),
260+
receive #'queue.declare_ok'{} ->
261+
rabbit_channel:do(Ch, #'queue.bind'{
262+
queue = QName,
263+
exchange = <<"amq.direct">>,
264+
routing_key = "confirms-magic" }),
265+
receive #'queue.bind_ok'{} -> ok
266+
after ?TIMEOUT -> throw(failed_to_bind_queue)
267+
end
268+
after ?TIMEOUT -> throw(failed_to_declare_queue)
269+
end
270+
end,
271+
%% Declare and bind two queues
272+
DeclareBindDurableQueue(QName1),
273+
DeclareBindDurableQueue(QName2),
274+
%% Get the first one's pid (we'll crash it later)
275+
{ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
276+
QPid1 = amqqueue:get_pid(Q1),
277+
%% Enable confirms
278+
rabbit_channel:do(Ch, #'confirm.select'{}),
279+
receive
280+
#'confirm.select_ok'{} -> ok
281+
after ?TIMEOUT -> throw(failed_to_enable_confirms)
282+
end,
283+
%% Publish a message
284+
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
285+
routing_key = "confirms-magic"
286+
},
287+
rabbit_basic:build_content(
288+
#'P_basic'{delivery_mode = 2}, <<"">>)),
289+
%% We must not kill the queue before the channel has processed the
290+
%% 'publish'.
291+
ok = rabbit_channel:flush(Ch),
292+
%% Crash the queue
293+
QPid1 ! boom,
294+
%% Wait for a nack
295+
receive
296+
#'basic.nack'{} -> ok;
297+
#'basic.ack'{} -> throw(received_ack_instead_of_nack)
298+
after ?TIMEOUT-> throw(did_not_receive_nack)
299+
end,
300+
receive
301+
#'basic.ack'{} -> throw(received_ack_when_none_expected)
302+
after 1000 -> ok
303+
end,
304+
%% Cleanup
305+
unlink(Ch),
306+
ok = rabbit_channel:shutdown(Ch),
307+
passed.
308+
309+
%% The closest to a nack behaviour that we can get on quorum queues is not answering while
310+
%% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued.
311+
confirm_minority(Config) ->
312+
[_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
313+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
314+
QName = ?config(queue_name, Config),
315+
declare_queue(Ch, Config, QName),
316+
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
317+
amqp_channel:call(Ch, #'confirm.select'{}),
318+
amqp_channel:register_confirm_handler(Ch, self()),
319+
publish(Ch, QName, [<<"msg1">>]),
320+
receive
321+
#'basic.nack'{} -> throw(unexpected_nack);
322+
#'basic.ack'{} -> throw(unexpected_ack)
323+
after 30000 ->
324+
ok
325+
end,
326+
ok = rabbit_ct_broker_helpers:start_node(Config, B),
327+
receive
328+
#'basic.nack'{} -> throw(unexpected_nack);
329+
#'basic.ack'{} -> ok
330+
after 60000 ->
331+
throw(missing_ack)
332+
end.
333+
334+
%%%%%%%%%%%%%%%%%%%%%%%%
335+
%% Test helpers
336+
%%%%%%%%%%%%%%%%%%%%%%%%
337+
declare_queue(Ch, Config, QName) ->
338+
Args = ?config(queue_args, Config),
339+
Durable = ?config(queue_durable, Config),
340+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
341+
arguments = Args,
342+
durable = Durable}).
343+
344+
publish(Ch, QName, Payloads) ->
345+
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
346+
|| Payload <- Payloads].
347+
348+
publish(Ch, QName, Payloads, Headers) ->
349+
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
350+
#amqp_msg{payload = Payload,
351+
props = #'P_basic'{headers = Headers}})
352+
|| Payload <- Payloads].
353+
354+
consume(Ch, QName, Payloads) ->
355+
[begin
356+
{#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
357+
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
358+
DTag
359+
end || Payload <- Payloads].
360+
361+
consume_empty(Ch, QName) ->
362+
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
363+
364+
sync_mirrors(QName, Config) ->
365+
case ?config(is_mirrored, Config) of
366+
true ->
367+
rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
368+
_ -> ok
369+
end.
370+
371+
receive_many([]) ->
372+
ok;
373+
receive_many(DTags) ->
374+
receive
375+
#'basic.ack'{delivery_tag = DTag, multiple = true} ->
376+
receive_many(DTags -- lists:seq(1, DTag));
377+
#'basic.ack'{delivery_tag = DTag, multiple = false} ->
378+
receive_many(DTags -- [DTag])
379+
after 5000 ->
380+
throw(missing_ack)
381+
end.

0 commit comments

Comments
 (0)