Skip to content

Commit 4fc8565

Browse files
committed
QQ: track number of requeues for message.
To be able to calculate the correct value for the AMQP delivery_count header we need to be able to distinguish between messages that were "released" or returned in QQ speak and those that were returned due to errors such as channel termination. This commit implement such tracking as well as the calculation of a new mc annotations `delivery_count` that AMQP makes use of to set the header value accordingly.
1 parent 4a71509 commit 4fc8565

File tree

10 files changed

+290
-137
lines changed

10 files changed

+290
-137
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,9 @@ rabbitmq_suite(
698698
rabbitmq_suite(
699699
name = "rabbit_fifo_int_SUITE",
700700
size = "medium",
701+
additional_beam = [
702+
":test_test_util_beam",
703+
],
701704
deps = [
702705
"//deps/rabbit_common:erlang_app",
703706
"@aten//:erlang_app",

deps/rabbit/app.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
13411341
outs = ["test/rabbit_fifo_int_SUITE.beam"],
13421342
app_name = "rabbit",
13431343
erlc_opts = "//:test_erlc_opts",
1344-
deps = ["//deps/rabbit_common:erlang_app"],
1344+
deps = ["//deps/rabbitmq_ct_helpers:erlang_app"],
13451345
)
13461346
erlang_bytecode(
13471347
name = "rabbit_fifo_prop_SUITE_beam_files",

deps/rabbit/src/mc_amqp.erl

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -222,14 +222,7 @@ get_property(priority, Msg) ->
222222
-spec protocol_state(state(), mc:annotations()) -> iolist().
223223
protocol_state(Msg0 = #msg_body_decoded{header = Header0,
224224
message_annotations = MA0}, Anns) ->
225-
FirstAcquirer = first_acquirer(Anns),
226-
Header = case Header0 of
227-
undefined ->
228-
#'v1_0.header'{durable = true,
229-
first_acquirer = FirstAcquirer};
230-
#'v1_0.header'{} ->
231-
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
232-
end,
225+
Header = update_header_from_anns(Header0, Anns),
233226
MA = protocol_state_message_annotations(MA0, Anns),
234227
Msg = Msg0#msg_body_decoded{header = Header,
235228
message_annotations = MA},
@@ -238,14 +231,7 @@ protocol_state(Msg0 = #msg_body_decoded{header = Header0,
238231
protocol_state(#msg_body_encoded{header = Header0,
239232
message_annotations = MA0,
240233
bare_and_footer = BareAndFooter}, Anns) ->
241-
FirstAcquirer = first_acquirer(Anns),
242-
Header = case Header0 of
243-
undefined ->
244-
#'v1_0.header'{durable = true,
245-
first_acquirer = FirstAcquirer};
246-
#'v1_0.header'{} ->
247-
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
248-
end,
234+
Header = update_header_from_anns(Header0, Anns),
249235
MA = protocol_state_message_annotations(MA0, Anns),
250236
Sections = to_sections(Header, MA, []),
251237
[encode(Sections), BareAndFooter];
@@ -269,10 +255,9 @@ protocol_state(#v1{message_annotations = MA0,
269255
_ ->
270256
undefined
271257
end,
272-
Header = #'v1_0.header'{durable = Durable,
273-
priority = Priority,
274-
ttl = Ttl,
275-
first_acquirer = first_acquirer(Anns)},
258+
Header = update_header_from_anns(#'v1_0.header'{durable = Durable,
259+
priority = Priority,
260+
ttl = Ttl}, Anns),
276261
MA = protocol_state_message_annotations(MA0, Anns),
277262
Sections = to_sections(Header, MA, []),
278263
[encode(Sections), BareAndFooter].
@@ -573,13 +558,13 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg)
573558
binary_part_bare_and_footer(Payload, Start) ->
574559
binary_part(Payload, Start, byte_size(Payload) - Start).
575560

576-
-spec first_acquirer(mc:annotations()) -> boolean().
577-
first_acquirer(Anns) ->
578-
Redelivered = case Anns of
579-
#{redelivered := R} -> R;
580-
_ -> false
581-
end,
582-
not Redelivered.
561+
update_header_from_anns(undefined, Anns) ->
562+
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
563+
update_header_from_anns(Header, Anns) ->
564+
FirstAcq = not maps:get(redelivered, Anns, false),
565+
DeliveryCount = {uint, maps:get(delivery_count, Anns, 0)},
566+
Header#'v1_0.header'{first_acquirer = FirstAcq,
567+
delivery_count = DeliveryCount}.
583568

584569
encode_deaths(Deaths) ->
585570
lists:map(

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@
9696
-ifdef(TEST).
9797
-export([update_header/4,
9898
chunk_disk_msgs/3,
99-
smallest_raft_index/1]).
99+
smallest_raft_index/1,
100+
make_requeue/4]).
100101
-endif.
101102

102103
-import(serial_number, [add/2, diff/2]).
@@ -308,7 +309,8 @@ apply(#{index := Idx} = Meta,
308309
when is_map_key(MsgId, Checked0) ->
309310
%% construct a message with the current raft index
310311
%% and update delivery count before adding it to the message queue
311-
Header = update_header(delivery_count, fun incr/1, 1, Header0),
312+
Header1 = update_header(delivery_count, fun incr/1, 1, Header0),
313+
Header = update_header(return_count, fun incr/1, 1, Header1),
312314
State0 = add_bytes_return(Header, State00),
313315
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
314316
credit = increase_credit(Con0, 1)},
@@ -1745,17 +1747,19 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
17451747
return(#{index := IncomingRaftIdx} = Meta,
17461748
ConsumerKey, MsgIds, Checked, Effects0, State0) ->
17471749
%% We requeue in the same order as messages got returned by the client.
1748-
{State1, Effects1} = lists:foldl(
1749-
fun(MsgId, Acc = {S0, E0}) ->
1750-
case Checked of
1751-
#{MsgId := Val} ->
1752-
{_At, Msg} = Val,
1753-
return_one(Meta, MsgId, Msg,
1754-
S0, E0, ConsumerKey);
1755-
#{} ->
1756-
Acc
1757-
end
1758-
end, {State0, Effects0}, MsgIds),
1750+
{State1, Effects1} =
1751+
lists:foldl(
1752+
fun(MsgId, Acc = {S0, E0}) ->
1753+
case Checked of
1754+
#{MsgId := {_At, Msg0}} ->
1755+
Msg = update_msg_header(return_count, fun incr/1, 1,
1756+
Msg0),
1757+
return_one(Meta, MsgId, Msg,
1758+
S0, E0, ConsumerKey);
1759+
#{} ->
1760+
Acc
1761+
end
1762+
end, {State0, Effects0}, MsgIds),
17591763
State2 = case State1#?STATE.consumers of
17601764
#{ConsumerKey := Con} ->
17611765
update_or_remove_con(Meta, ConsumerKey, Con, State1);
@@ -1904,11 +1908,13 @@ update_header(Key, UpdateFun, Default, Size)
19041908
when is_integer(Size) ->
19051909
update_header(Key, UpdateFun, Default, #{size => Size});
19061910
update_header(Key, UpdateFun, Default, ?TUPLE(Size, Expiry))
1907-
when is_integer(Size), is_integer(Expiry) ->
1911+
when is_integer(Size) andalso
1912+
is_integer(Expiry) ->
19081913
update_header(Key, UpdateFun, Default, #{size => Size,
19091914
expiry => Expiry});
19101915
update_header(Key, UpdateFun, Default, Header)
1911-
when is_map(Header), is_map_key(size, Header) ->
1916+
when is_map(Header) andalso
1917+
is_map_key(size, Header) ->
19121918
maps:update_with(Key, UpdateFun, Default, Header).
19131919

19141920
get_msg_header(?MSG(_Idx, Header)) ->

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@
4242
optimised_tuple(msg_size(), Expiry :: milliseconds()) |
4343
#{size := msg_size(),
4444
delivery_count => non_neg_integer(),
45+
return_count => non_neg_integer(),
4546
expiry => milliseconds()}.
4647
%% The message header:
4748
%% size: The size of the message payload in bytes.
48-
%% delivery_count: the number of unsuccessful delivery attempts.
49+
%% delivery_count: The number of unsuccessful delivery attempts.
4950
%% A non-zero value indicates a previous attempt.
51+
%% return_count: The number of explicit returns.
5052
%% expiry: Epoch time in ms when a message expires. Set during enqueue.
5153
%% Value is determined by per-queue or per-message message TTL.
5254
%% If it contains only the size it can be condensed to an integer.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,7 @@ dequeue(QueueName, ConsumerTag, Settlement,
208208
{ok, {dequeue, empty}, Leader} ->
209209
{empty, State0#state{leader = Leader}};
210210
{ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} ->
211-
Count = case MsgHeader of
212-
#{delivery_count := C} -> C;
213-
_ -> 0
214-
end,
215-
IsDelivered = Count > 0,
216-
Msg = add_delivery_count_header(Msg0, Count),
211+
{Msg, IsDelivered} = add_delivery_count_header(Msg0, MsgHeader),
217212
{ok, MsgsReady,
218213
{QueueName, qref(Leader), MsgId, IsDelivered, Msg},
219214
State0#state{leader = Leader}};
@@ -223,14 +218,21 @@ dequeue(QueueName, ConsumerTag, Settlement,
223218
Err
224219
end.
225220

226-
add_delivery_count_header(Msg, Count) ->
227-
case mc:is(Msg) of
228-
true when is_integer(Count) andalso
229-
Count > 0 ->
230-
mc:set_annotation(<<"x-delivery-count">>, Count, Msg);
221+
add_delivery_count_header(Msg, #{delivery_count := DelCount} = Header)
222+
when is_integer(DelCount) ->
223+
{case mc:is(Msg) of
224+
true ->
225+
%% the "delivery-count" header in the AMQP spec does not include
226+
%% returns (released outcomes)
227+
AmqpDelCount = DelCount - maps:get(return_count, Header, 0),
228+
mc:set_annotation(delivery_count, AmqpDelCount,
229+
mc:set_annotation(<<"x-delivery-count">>,
230+
DelCount, Msg));
231231
_ ->
232232
Msg
233-
end.
233+
end, DelCount > 0};
234+
add_delivery_count_header(Msg, _Header) ->
235+
{Msg, false}.
234236

235237

236238
%% @doc Settle a message. Permanently removes message from the queue.
@@ -840,13 +842,7 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
840842
transform_msgs(QName, QRef, Msgs) ->
841843
lists:map(
842844
fun({MsgId, {MsgHeader, Msg0}}) ->
843-
{Msg, Redelivered} = case MsgHeader of
844-
#{delivery_count := C} ->
845-
{add_delivery_count_header(Msg0, C), true};
846-
_ ->
847-
{Msg0, false}
848-
end,
849-
845+
{Msg, Redelivered} = add_delivery_count_header(Msg0, MsgHeader),
850846
{QName, QRef, MsgId, Redelivered, Msg}
851847
end, Msgs).
852848

deps/rabbit/test/amqp_system_SUITE.erl

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ groups() ->
3434
%% TODO at_most_once,
3535
reject,
3636
redelivery,
37+
released,
3738
routing,
3839
invalid_routes,
3940
auth_failure,
@@ -80,6 +81,13 @@ end_per_group(_, Config) ->
8081
rabbit_ct_broker_helpers:teardown_steps()).
8182

8283
init_per_testcase(Testcase, Config) ->
84+
enable_feature_flags(Config,
85+
[
86+
message_containers_store_amqp_v1,
87+
credit_api_v2,
88+
quorum_queues_v4
89+
% amqp_address_v1
90+
]),
8391
rabbit_ct_helpers:testcase_started(Config, Testcase).
8492

8593
end_per_testcase(Testcase, Config) ->
@@ -115,22 +123,20 @@ build_maven_test_project(Config) ->
115123
%% -------------------------------------------------------------------
116124

117125
roundtrip(Config) ->
126+
declare_queue(Config, ?FUNCTION_NAME, "quorum"),
118127
run(Config, [{dotnet, "roundtrip"},
119128
{java, "RoundTripTest"}]).
120129

121130
streams(Config) ->
122-
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config,
123-
message_containers_store_amqp_v1),
124-
Ch = rabbit_ct_client_helpers:open_channel(Config),
125-
amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q2">>,
126-
durable = true,
127-
arguments = [{<<"x-queue-type">>, longstr, "stream"}]}),
131+
declare_queue(Config, ?FUNCTION_NAME, "stream"),
128132
run(Config, [{dotnet, "streams"}]).
129133

130134
roundtrip_to_amqp_091(Config) ->
135+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
131136
run(Config, [{dotnet, "roundtrip_to_amqp_091"}]).
132137

133138
default_outcome(Config) ->
139+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
134140
run(Config, [{dotnet, "default_outcome"}]).
135141

136142
no_routes_is_released(Config) ->
@@ -140,28 +146,41 @@ no_routes_is_released(Config) ->
140146
run(Config, [{dotnet, "no_routes_is_released"}]).
141147

142148
outcomes(Config) ->
149+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
143150
run(Config, [{dotnet, "outcomes"}]).
144151

145152
fragmentation(Config) ->
153+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
146154
run(Config, [{dotnet, "fragmentation"}]).
147155

148156
message_annotations(Config) ->
157+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
149158
run(Config, [{dotnet, "message_annotations"}]).
150159

151160
footer(Config) ->
161+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
152162
run(Config, [{dotnet, "footer"}]).
153163

154164
data_types(Config) ->
165+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
155166
run(Config, [{dotnet, "data_types"}]).
156167

157168
reject(Config) ->
169+
declare_queue(Config, ?FUNCTION_NAME, "classic"),
158170
run(Config, [{dotnet, "reject"}]).
159171

160172
redelivery(Config) ->
173+
declare_queue(Config, ?FUNCTION_NAME, "quorum"),
161174
run(Config, [{dotnet, "redelivery"}]).
162175

176+
released(Config) ->
177+
declare_queue(Config, ?FUNCTION_NAME, "quorum"),
178+
run(Config, [{dotnet, "released"}]).
179+
163180
routing(Config) ->
164181
Ch = rabbit_ct_client_helpers:open_channel(Config),
182+
amqp_channel:call(Ch, #'queue.declare'{queue = <<"test">>,
183+
durable = true}),
165184
amqp_channel:call(Ch, #'queue.declare'{queue = <<"transient_q">>,
166185
durable = false}),
167186
amqp_channel:call(Ch, #'queue.declare'{queue = <<"durable_q">>,
@@ -174,6 +193,18 @@ routing(Config) ->
174193
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
175194
amqp_channel:call(Ch, #'queue.declare'{queue = <<"autodel_q">>,
176195
auto_delete = true}),
196+
amqp_channel:call(Ch, #'queue.declare'{queue = <<"fanout_q">>,
197+
durable = false}),
198+
amqp_channel:call(Ch, #'queue.bind'{queue = <<"fanout_q">>,
199+
exchange = <<"amq.fanout">>
200+
}),
201+
amqp_channel:call(Ch, #'queue.declare'{queue = <<"direct_q">>,
202+
durable = false}),
203+
amqp_channel:call(Ch, #'queue.bind'{queue = <<"direct_q">>,
204+
exchange = <<"amq.direct">>,
205+
routing_key = <<"direct_q">>
206+
}),
207+
177208
run(Config, [
178209
{dotnet, "routing"}
179210
]).
@@ -227,6 +258,7 @@ run_dotnet_test(Config, Method) ->
227258
[
228259
{cd, TestProjectDir}
229260
]),
261+
ct:pal("~s: result ~p", [?FUNCTION_NAME, Ret]),
230262
{ok, _} = Ret.
231263

232264
run_java_test(Config, Class) ->
@@ -239,3 +271,23 @@ run_java_test(Config, Class) ->
239271
],
240272
[{cd, TestProjectDir}]),
241273
{ok, _} = Ret.
274+
275+
276+
enable_feature_flags(Config, Flags) ->
277+
[begin
278+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, Flag) of
279+
ok -> ok;
280+
_ ->
281+
throw({skip, "feature flag ~s could not be enabled"})
282+
end
283+
end || Flag <- Flags].
284+
285+
declare_queue(Config, Name, Type) ->
286+
Ch = rabbit_ct_client_helpers:open_channel(Config),
287+
#'queue.declare_ok'{} =
288+
amqp_channel:call(Ch, #'queue.declare'{queue = atom_to_binary(Name, utf8),
289+
durable = true,
290+
arguments = [{<<"x-queue-type">>,
291+
longstr, Type}]}),
292+
rabbit_ct_client_helpers:close_channel(Ch),
293+
ok.

0 commit comments

Comments
 (0)