Skip to content

Commit 6c07e70

Browse files
Merge pull request #12442 from rabbitmq/gh_12424
QQ: fix bug with discards using a consumer_id()
2 parents 2a1dbd0 + 2339401 commit 6c07e70

File tree

3 files changed

+75
-49
lines changed

3 files changed

+75
-49
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,27 @@ apply(Meta, #settle{msg_ids = MsgIds,
265265
_ ->
266266
{State, ok}
267267
end;
268-
apply(Meta, #discard{consumer_key = ConsumerKey,
269-
msg_ids = MsgIds},
268+
apply(#{machine_version := 4} = Meta,
269+
#discard{consumer_key = ConsumerKey,
270+
msg_ids = MsgIds},
270271
#?STATE{consumers = Consumers } = State0) ->
272+
%% buggy version that would have not found the consumer if the ConsumerKey
273+
%% was a consumer_id()
271274
case find_consumer(ConsumerKey, Consumers) of
272275
{ConsumerKey, #consumer{} = Con} ->
273276
discard(Meta, MsgIds, ConsumerKey, Con, true, #{}, State0);
274277
_ ->
275278
{State0, ok}
276279
end;
280+
apply(Meta, #discard{consumer_key = ConsumerKey,
281+
msg_ids = MsgIds},
282+
#?STATE{consumers = Consumers } = State0) ->
283+
case find_consumer(ConsumerKey, Consumers) of
284+
{ActualConsumerKey, #consumer{} = Con} ->
285+
discard(Meta, MsgIds, ActualConsumerKey, Con, true, #{}, State0);
286+
_ ->
287+
{State0, ok}
288+
end;
277289
apply(Meta, #return{consumer_key = ConsumerKey,
278290
msg_ids = MsgIds},
279291
#?STATE{consumers = Cons} = State) ->
@@ -291,13 +303,14 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
291303
msg_ids = MsgIds},
292304
#?STATE{consumers = Cons} = State) ->
293305
case find_consumer(ConsumerKey, Cons) of
294-
{ConsumerKey, #consumer{checked_out = Checked}}
306+
{ActualConsumerKey, #consumer{checked_out = Checked}}
295307
when Undel == false ->
296-
return(Meta, ConsumerKey, MsgIds, DelFailed,
308+
return(Meta, ActualConsumerKey, MsgIds, DelFailed,
297309
Anns, Checked, [], State);
298-
{ConsumerKey, #consumer{} = Con}
310+
{ActualConsumerKey, #consumer{} = Con}
299311
when Undel == true ->
300-
discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State);
312+
discard(Meta, MsgIds, ActualConsumerKey,
313+
Con, DelFailed, Anns, State);
301314
_ ->
302315
{State, ok}
303316
end;
@@ -898,13 +911,14 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
898911
end.
899912

900913
-spec version() -> pos_integer().
901-
version() -> 4.
914+
version() -> 5.
902915

903916
which_module(0) -> rabbit_fifo_v0;
904917
which_module(1) -> rabbit_fifo_v1;
905918
which_module(2) -> rabbit_fifo_v3;
906919
which_module(3) -> rabbit_fifo_v3;
907-
which_module(4) -> ?MODULE.
920+
which_module(4) -> ?MODULE;
921+
which_module(5) -> ?MODULE.
908922

909923
-define(AUX, aux_v3).
910924

@@ -2520,15 +2534,15 @@ make_checkout({_, _} = ConsumerId, Spec0, Meta) ->
25202534
make_settle(ConsumerKey, MsgIds) when is_list(MsgIds) ->
25212535
#settle{consumer_key = ConsumerKey, msg_ids = MsgIds}.
25222536

2523-
-spec make_return(consumer_id(), [msg_id()]) -> protocol().
2537+
-spec make_return(consumer_key(), [msg_id()]) -> protocol().
25242538
make_return(ConsumerKey, MsgIds) ->
25252539
#return{consumer_key = ConsumerKey, msg_ids = MsgIds}.
25262540

25272541
-spec is_return(protocol()) -> boolean().
25282542
is_return(Command) ->
25292543
is_record(Command, return).
25302544

2531-
-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
2545+
-spec make_discard(consumer_key(), [msg_id()]) -> protocol().
25322546
make_discard(ConsumerKey, MsgIds) ->
25332547
#discard{consumer_key = ConsumerKey, msg_ids = MsgIds}.
25342548

@@ -2701,7 +2715,10 @@ convert(Meta, 1, To, State) ->
27012715
convert(Meta, 2, To, State) ->
27022716
convert(Meta, 3, To, rabbit_fifo_v3:convert_v2_to_v3(State));
27032717
convert(Meta, 3, To, State) ->
2704-
convert(Meta, 4, To, convert_v3_to_v4(Meta, State)).
2718+
convert(Meta, 4, To, convert_v3_to_v4(Meta, State));
2719+
convert(Meta, 4, To, State) ->
2720+
%% no conversion needed, this version only includes a logic change
2721+
convert(Meta, 5, To, State).
27052722

27062723
smallest_raft_index(#?STATE{messages = Messages,
27072724
ra_indexes = Indexes,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ all_tests() ->
174174
per_message_ttl_expiration_too_high,
175175
consumer_priorities,
176176
cancel_consumer_gh_3729,
177+
cancel_consumer_gh_12424,
177178
cancel_and_consume_with_same_tag,
178179
validate_messages_on_queue,
179180
amqpl_headers,
@@ -3600,6 +3601,37 @@ cancel_consumer_gh_3729(Config) ->
36003601

36013602
ok = rabbit_ct_client_helpers:close_channel(Ch).
36023603

3604+
cancel_consumer_gh_12424(Config) ->
3605+
QQ = ?config(queue_name, Config),
3606+
3607+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
3608+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
3609+
3610+
ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0},
3611+
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
3612+
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),
3613+
3614+
ok = publish(Ch, QQ),
3615+
3616+
ok = subscribe(Ch, QQ, false),
3617+
3618+
DeliveryTag = receive
3619+
{#'basic.deliver'{delivery_tag = DT}, _} ->
3620+
DT
3621+
after 5000 ->
3622+
flush(100),
3623+
ct:fail("basic.deliver timeout")
3624+
end,
3625+
3626+
ok = cancel(Ch),
3627+
3628+
R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = false},
3629+
ok = amqp_channel:cast(Ch, R),
3630+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
3631+
3632+
ok.
3633+
3634+
%% Test the scenario where a message is published to a quorum queue
36033635
cancel_and_consume_with_same_tag(Config) ->
36043636
%% https://github.com/rabbitmq/rabbitmq-server/issues/5927
36053637
QQ = ?config(queue_name, Config),

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ groups() ->
4242
].
4343

4444
init_per_group(tests, Config) ->
45-
[{machine_version, 4} | Config];
45+
[{machine_version, 5} | Config];
4646
init_per_group(machine_version_conversion, Config) ->
4747
Config.
4848

4949
init_per_testcase(_Testcase, Config) ->
50-
FF = ?config(machine_version, Config) == 4,
50+
FF = ?config(machine_version, Config) == 5,
5151
ok = meck:new(rabbit_feature_flags, [passthrough]),
5252
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end),
5353
Config.
@@ -804,6 +804,19 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(Config) ->
804804

805805
ok.
806806

807+
discard_after_cancel_test(Config) ->
808+
Cid = {?FUNCTION_NAME_B, self()},
809+
{State0, _} = enq(Config, 1, 1, first, test_init(test)),
810+
{State1, #{key := _CKey,
811+
next_msg_id := MsgId}, _Effects1} =
812+
checkout(Config, ?LINE, Cid, 10, State0),
813+
{State2, _, _} = apply(meta(Config, ?LINE),
814+
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
815+
{State, _, _} = apply(meta(Config, ?LINE),
816+
rabbit_fifo:make_discard(Cid, [MsgId]), State2),
817+
ct:pal("State ~p", [State]),
818+
ok.
819+
807820
enqueued_msg_with_delivery_count_test(Config) ->
808821
State00 = init(#{name => test,
809822
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
@@ -2786,45 +2799,9 @@ modify_test(Config) ->
27862799

27872800
ok.
27882801

2789-
ttb_test(Config) ->
2790-
S0 = init(#{name => ?FUNCTION_NAME,
2791-
queue_resource =>
2792-
rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}),
2793-
2794-
2795-
S1 = do_n(5_000_000,
2796-
fun (N, Acc) ->
2797-
I = (5_000_000 - N),
2798-
element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc))
2799-
end, S0),
2800-
2801-
2802-
2803-
{T1, _Res} = timer:tc(fun () ->
2804-
do_n(100, fun (_, S) ->
2805-
term_to_binary(S),
2806-
S1 end, S1)
2807-
end),
2808-
ct:pal("T1 took ~bus", [T1]),
2809-
2810-
2811-
{T2, _} = timer:tc(fun () ->
2812-
do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1)
2813-
end),
2814-
ct:pal("T2 took ~bus", [T2]),
2815-
2816-
ok.
2817-
28182802
%% Utility
28192803
%%
28202804

2821-
do_n(0, _, A) ->
2822-
A;
2823-
do_n(N, Fun, A0) ->
2824-
A = Fun(N, A0),
2825-
do_n(N-1, Fun, A).
2826-
2827-
28282805
init(Conf) -> rabbit_fifo:init(Conf).
28292806
make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
28302807
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).

0 commit comments

Comments
 (0)