Skip to content

Commit be05f9a

Browse files
ansdkjnilsson
authored andcommitted
Add test for modified outcome with classic queue
1 parent 374980c commit be05f9a

File tree

5 files changed

+116
-49
lines changed

5 files changed

+116
-49
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,7 +1862,7 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
18621862
settle_op_from_outcome(#'v1_0.released'{}) ->
18631863
requeue;
18641864

1865-
%% RabbitMQ does not support any of the modified outcome fields correctly.
1865+
%% Not all queue types support the modified outcome fields correctly.
18661866
%% However, we still allow the client to settle with the modified outcome
18671867
%% because some client libraries such as Apache QPid make use of it:
18681868
%% https://github.com/apache/qpid-jms/blob/90eb60f59cb59b7b9ad8363ee8a843d6903b8e77/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java#L464
@@ -1871,7 +1871,7 @@ settle_op_from_outcome(#'v1_0.released'{}) ->
18711871
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
18721872
undeliverable_here = UndelHere,
18731873
message_annotations = Anns0
1874-
}) ->
1874+
}) ->
18751875
Anns = case Anns0 of
18761876
#'v1_0.message_annotations'{content = C} ->
18771877
C;

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ cancel(Q, Spec, State) ->
333333
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
334334
rabbit_types:ctag(), [non_neg_integer()], state()) ->
335335
{state(), rabbit_queue_type:actions()}.
336-
settle(QName, {modify, _DelFailed, Undel, _}, CTag, MsgIds, State) ->
336+
settle(QName, {modify, _DelFailed, Undel, _Anns}, CTag, MsgIds, State) ->
337337
%% translate modify into other op
338338
Op = case Undel of
339339
true ->

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ apply(Meta, #discard{consumer_key = ConsumerKey,
272272
end;
273273
apply(Meta, #return{consumer_key = ConsumerKey,
274274
msg_ids = MsgIds},
275-
#?STATE{consumers = Cons0} = State) ->
276-
case find_consumer(ConsumerKey, Cons0) of
275+
#?STATE{consumers = Cons} = State) ->
276+
case find_consumer(ConsumerKey, Cons) of
277277
{ActualConsumerKey, #consumer{checked_out = Checked}} ->
278278
return(Meta, ActualConsumerKey, MsgIds, false,
279279
#{}, Checked, [], State);
@@ -285,34 +285,34 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
285285
undeliverable_here = Undel,
286286
annotations = Anns,
287287
msg_ids = MsgIds},
288-
#?STATE{consumers = Cons0} = State0) ->
289-
case find_consumer(ConsumerKey, Cons0) of
288+
#?STATE{consumers = Cons} = State) ->
289+
case find_consumer(ConsumerKey, Cons) of
290290
{ConsumerKey, #consumer{checked_out = Checked}}
291291
when Undel == false ->
292292
return(Meta, ConsumerKey, MsgIds, DelFailed,
293-
Anns, Checked, [], State0);
293+
Anns, Checked, [], State);
294294
{ConsumerKey, #consumer{} = Con}
295295
when Undel == true ->
296-
discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State0);
296+
discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State);
297297
_ ->
298-
{State0, ok}
298+
{State, ok}
299299
end;
300300
apply(#{index := Idx} = Meta,
301301
#requeue{consumer_key = ConsumerKey,
302302
msg_id = MsgId,
303303
index = OldIdx,
304304
header = Header0},
305-
#?STATE{consumers = Cons0,
305+
#?STATE{consumers = Cons,
306306
messages = Messages,
307307
ra_indexes = Indexes0,
308308
enqueue_count = EnqCount} = State00) ->
309309
%% the actual consumer key was looked up in the aux handler so we
310310
%% dont need to use find_consumer/2 here
311-
case Cons0 of
311+
case Cons of
312312
#{ConsumerKey := #consumer{checked_out = Checked0} = Con0}
313313
when is_map_key(MsgId, Checked0) ->
314314
%% construct a message with the current raft index
315-
%% and update delivery count before adding it to the message queue
315+
%% and update acquired count before adding it to the message queue
316316
Header = update_header(acquired_count, fun incr/1, 1, Header0),
317317
State0 = add_bytes_return(Header, State00),
318318
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
@@ -660,9 +660,7 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
660660
(_, Msg) ->
661661
Msg
662662
end, Ch0),
663-
C#consumer{checked_out = Ch};
664-
(_, Msg) ->
665-
Msg
663+
C#consumer{checked_out = Ch}
666664
end, Consumers0),
667665
Returns = lqueue:from_list(
668666
lists:map(fun (?MSG(I, #{delivery_count := DC} = H)) ->
@@ -696,7 +694,7 @@ purge_node(Meta, Node, State, Effects) ->
696694
end, {State, Effects},
697695
all_pids_for(Node, State)).
698696

699-
%% any downs that re not noconnection
697+
%% any downs that are not noconnection
700698
handle_down(Meta, Pid, #?STATE{consumers = Cons0,
701699
enqueuers = Enqs0} = State0) ->
702700
% Remove any enqueuer for the down pid
@@ -1827,11 +1825,10 @@ annotate_msg(Header, Msg0) ->
18271825
Msg = maps:fold(fun (K, V, Acc) ->
18281826
mc:set_annotation(K, V, Acc)
18291827
end, Msg0, maps:get(anns, Header, #{})),
1830-
case is_map_key(delivery_count, Header) of
1831-
true ->
1832-
mc:set_annotation(delivery_count,
1833-
maps:get(delivery_count, Header), Msg);
1834-
false ->
1828+
case Header of
1829+
#{delivery_count := DelCount} ->
1830+
mc:set_annotation(delivery_count, DelCount, Msg);
1831+
_ ->
18351832
Msg
18361833
end;
18371834
_ ->
@@ -1849,7 +1846,7 @@ return_one(Meta, MsgId, ?MSG(_, _) = Msg0, DelivFailed, Anns,
18491846
Msg = incr_msg(Msg0, DelivFailed, Anns),
18501847
Header = get_msg_header(Msg),
18511848
case get_header(acquired_count, Header) of
1852-
DeliveryCount when DeliveryCount > DeliveryLimit ->
1849+
AcquiredCount when AcquiredCount > DeliveryLimit ->
18531850
{DlxState, DlxEffects} =
18541851
rabbit_fifo_dlx:discard([Msg], delivery_limit, DLH, DlxState0),
18551852
State1 = State0#?STATE{dlx = DlxState},

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,5 +362,5 @@ dehydrate(State) ->
362362
smallest_raft_index(#?MODULE{ra_indexes = Indexes}) ->
363363
rabbit_fifo_index:smallest(Indexes).
364364

365-
annotate_msg(H, M0) ->
366-
rabbit_fifo:annotate_msg(H, M0).
365+
annotate_msg(H, Msg) ->
366+
rabbit_fifo:annotate_msg(H, Msg).

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ groups() ->
4141
[
4242
reliable_send_receive_with_outcomes_classic_queue,
4343
reliable_send_receive_with_outcomes_quorum_queue,
44-
modified,
44+
modified_classic_queue,
45+
modified_quorum_queue,
4546
sender_settle_mode_unsettled,
4647
sender_settle_mode_unsettled_fanout,
4748
sender_settle_mode_mixed,
@@ -403,11 +404,59 @@ reliable_send_receive(QType, Outcome, Config) ->
403404
ok = end_session_sync(Session2),
404405
ok = amqp10_client:close_connection(Connection2).
405406

406-
%% This test case doesn't expect the correct AMQP spec behavivour.
407-
%% We know that RabbitMQ doesn't implement the modified outcome correctly.
408-
%% Here, we test RabbitMQ's workaround behaviour:
409-
%% RabbitMQ discards if undeliverable-here is true. Otherwise, RabbitMQ requeues.
410-
modified(Config) ->
407+
%% We test the modified outcome with classic queues.
408+
%% We expect that classic queues implement field undeliverable-here incorrectly
409+
%% by discarding (if true) or requeueing (if false).
410+
%% Fields delivery-failed and message-annotations are not implemented.
411+
modified_classic_queue(Config) ->
412+
QName = atom_to_binary(?FUNCTION_NAME),
413+
{Connection, Session, LinkPair} = init(Config),
414+
{ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(
415+
LinkPair, QName,
416+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"classic">>}}}),
417+
Address = rabbitmq_amqp_address:queue(QName),
418+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
419+
ok = wait_for_credit(Sender),
420+
421+
Msg1 = amqp10_msg:new(<<"tag1">>, <<"m1">>, true),
422+
Msg2 = amqp10_msg:new(<<"tag2">>, <<"m2">>, true),
423+
ok = amqp10_client:send_msg(Sender, Msg1),
424+
ok = amqp10_client:send_msg(Sender, Msg2),
425+
ok = amqp10_client:detach_link(Sender),
426+
427+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
428+
429+
{ok, M1} = amqp10_client:get_msg(Receiver),
430+
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
431+
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
432+
433+
{ok, M2a} = amqp10_client:get_msg(Receiver),
434+
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
435+
ok = amqp10_client:settle_msg(Receiver, M2a,
436+
{modified, false, false, #{}}),
437+
438+
{ok, M2b} = amqp10_client:get_msg(Receiver),
439+
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
440+
ok = amqp10_client:settle_msg(Receiver, M2b,
441+
{modified, true, false, #{<<"x-opt-key">> => <<"val">>}}),
442+
443+
{ok, M2c} = amqp10_client:get_msg(Receiver),
444+
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
445+
ok = amqp10_client:settle_msg(Receiver, M2c, modified),
446+
447+
ok = amqp10_client:detach_link(Receiver),
448+
?assertMatch({ok, #{message_count := 1}},
449+
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
450+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
451+
ok = end_session_sync(Session),
452+
ok = amqp10_client:close_connection(Connection).
453+
454+
%% We test the modified outcome with quorum queues.
455+
%% We expect that quorum queues implement field
456+
%% * delivery-failed correctly
457+
%% * undeliverable-here incorrectly by discarding (if true) or requeueing (if false)
458+
%% * message-annotations correctly
459+
modified_quorum_queue(Config) ->
411460
QName = atom_to_binary(?FUNCTION_NAME),
412461
{Connection, Session, LinkPair} = init(Config),
413462
{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
@@ -427,34 +476,53 @@ modified(Config) ->
427476

428477
{ok, M1} = amqp10_client:get_msg(Receiver),
429478
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
430-
?assertEqual(0, amqp10_msg:header(delivery_count, M1)),
479+
?assertMatch(#{delivery_count := 0,
480+
first_acquirer := true},
481+
amqp10_msg:headers(M1)),
482+
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
431483

432-
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false,
433-
_UndeliverableHere = true, #{}}),
434484
{ok, M2a} = amqp10_client:get_msg(Receiver),
435485
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
436-
?assertEqual(0, amqp10_msg:header(delivery_count, M2a)),
437-
486+
?assertMatch(#{delivery_count := 0,
487+
first_acquirer := true},
488+
amqp10_msg:headers(M2a)),
438489
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
490+
439491
{ok, M2b} = amqp10_client:get_msg(Receiver),
440492
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
441-
?assertEqual(0, amqp10_msg:header(delivery_count, M2b)),
442-
493+
?assertMatch(#{delivery_count := 0,
494+
first_acquirer := false},
495+
amqp10_msg:headers(M2b)),
443496
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
497+
444498
{ok, M2c} = amqp10_client:get_msg(Receiver),
445-
?assertEqual(1, amqp10_msg:header(delivery_count, M2c)),
499+
?assertMatch(#{delivery_count := 1,
500+
first_acquirer := false},
501+
amqp10_msg:headers(M2c)),
446502
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
447-
448-
449503
ok = amqp10_client:settle_msg(Receiver, M2c,
450504
{modified, true, false,
451-
#{<<"x-opt-key">> => <<"val">>}}),
505+
#{<<"x-opt-key">> => <<"val 1">>}}),
452506

453507
{ok, M2d} = amqp10_client:get_msg(Receiver),
454508
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
455-
?assertEqual(2, amqp10_msg:header(delivery_count, M2d)),
456-
?assertMatch(#{<<"x-opt-key">> := <<"val">>}, amqp10_msg:message_annotations(M2d)),
457-
ok = amqp10_client:settle_msg(Receiver, M2d, modified),
509+
?assertMatch(#{delivery_count := 2,
510+
first_acquirer := false},
511+
amqp10_msg:headers(M2d)),
512+
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
513+
ok = amqp10_client:settle_msg(Receiver, M2d,
514+
{modified, false, false,
515+
#{<<"x-opt-key">> => <<"val 2">>,
516+
<<"x-other">> => 99}}),
517+
518+
{ok, M2e} = amqp10_client:get_msg(Receiver),
519+
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
520+
?assertMatch(#{delivery_count := 2,
521+
first_acquirer := false},
522+
amqp10_msg:headers(M2e)),
523+
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
524+
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
525+
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
458526

459527
ok = amqp10_client:detach_link(Receiver),
460528
?assertMatch({ok, #{message_count := 1}},
@@ -4429,6 +4497,7 @@ dead_letter_reject(Config) ->
44294497
QName1,
44304498
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
44314499
<<"x-message-ttl">> => {ulong, 20},
4500+
<<"x-overflow">> => {utf8, <<"reject-publish">>},
44324501
<<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>},
44334502
<<"x-dead-letter-exchange">> => {utf8, <<>>},
44344503
<<"x-dead-letter-routing-key">> => {utf8, QName2}
@@ -4463,22 +4532,22 @@ dead_letter_reject(Config) ->
44634532
?assertMatch(#{delivery_count := 0,
44644533
first_acquirer := true}, amqp10_msg:headers(Msg1)),
44654534
ok = amqp10_client:settle_msg(Receiver, Msg1, rejected),
4535+
44664536
{ok, Msg2} = amqp10_client:get_msg(Receiver),
44674537
?assertMatch(#{delivery_count := 1,
44684538
first_acquirer := false}, amqp10_msg:headers(Msg2)),
44694539
ok = amqp10_client:settle_msg(Receiver, Msg2,
44704540
{modified, true, true,
44714541
#{<<"x-opt-thekey">> => <<"val">>}}),
4542+
44724543
{ok, Msg3} = amqp10_client:get_msg(Receiver),
44734544
?assertMatch(#{delivery_count := 2,
44744545
first_acquirer := false}, amqp10_msg:headers(Msg3)),
4475-
?assertMatch(#{<<"x-opt-thekey">> := <<"val">>},
4476-
amqp10_msg:message_annotations(Msg3)),
4477-
ok = amqp10_client:settle_msg(Receiver, Msg3, accepted),
44784546
?assertEqual(Body, amqp10_msg:body_bin(Msg3)),
44794547
Annotations = amqp10_msg:message_annotations(Msg3),
44804548
?assertMatch(
4481-
#{<<"x-first-death-queue">> := QName1,
4549+
#{<<"x-opt-thekey">> := <<"val">>,
4550+
<<"x-first-death-queue">> := QName1,
44824551
<<"x-first-death-exchange">> := <<>>,
44834552
<<"x-first-death-reason">> := <<"expired">>,
44844553
<<"x-last-death-queue">> := QName1,
@@ -4516,6 +4585,7 @@ dead_letter_reject(Config) ->
45164585
]} = D3,
45174586
?assertEqual([Ts1, Ts3, Ts5, Ts4, Ts6, Ts2],
45184587
lists:sort([Ts1, Ts2, Ts3, Ts4, Ts5, Ts6])),
4588+
ok = amqp10_client:settle_msg(Receiver, Msg3, accepted),
45194589

45204590
ok = amqp10_client:detach_link(Receiver),
45214591
ok = amqp10_client:detach_link(Sender),

0 commit comments

Comments
 (0)