Skip to content

Commit d36dcce

Browse files
committed
QQ: add SAC consumer priority integration tests
1 parent c410fc1 commit d36dcce

File tree

2 files changed

+126
-4
lines changed

2 files changed

+126
-4
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1514,7 +1514,8 @@ activate_next_consumer(#?STATE{consumers = Cons,
15141514
{State0, Effects0}
15151515
end.
15161516

1517-
active_consumer({CKey, #consumer{status = up} = Consumer, _I}) ->
1517+
active_consumer({CKey, #consumer{status = Status} = Consumer, _I})
1518+
when Status == up orelse Status == fading ->
15181519
{CKey, Consumer};
15191520
active_consumer({_CKey, #consumer{status = _}, I}) ->
15201521
active_consumer(maps:next(I));

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ groups() ->
9090
leader_locator_policy,
9191
status,
9292
format,
93-
add_member_2
93+
add_member_2,
94+
single_active_consumer_priority_take_over,
95+
single_active_consumer_priority
9496
]
9597
++ all_tests()},
9698
{cluster_size_5, [], [start_queue,
@@ -923,6 +925,7 @@ publish_confirm(Ch, QName, Timeout) ->
923925
ct:pal("NOT CONFIRMED! ~ts", [QName]),
924926
fail
925927
after Timeout ->
928+
flush(1),
926929
exit(confirm_timeout)
927930
end.
928931

@@ -971,6 +974,117 @@ consume_in_minority(Config) ->
971974
ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server2),
972975
ok.
973976

977+
single_active_consumer_priority_take_over(Config) ->
978+
[Server0, Server1, _Server2] =
979+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
980+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
981+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
982+
QName = ?config(queue_name, Config),
983+
Q1 = <<QName/binary, "_1">>,
984+
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
985+
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
986+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
987+
{<<"x-single-active-consumer">>, bool, true}],
988+
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
989+
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 1}]),
990+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
991+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
992+
#'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
993+
publish_confirm(Ch2, Q1),
994+
%% higher priority consumer attaches
995+
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 3}]),
996+
997+
%% Q1 should still have Ch1 as consumer as it has pending messages
998+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
999+
rpc:call(Server0, ra, local_query,
1000+
[RaNameQ1, QueryFun])),
1001+
1002+
%% ack the message
1003+
receive
1004+
{#'basic.deliver'{consumer_tag = <<"ch1-ctag1">>,
1005+
delivery_tag = DeliveryTag}, _} ->
1006+
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
1007+
multiple = false})
1008+
after 5000 ->
1009+
flush(1),
1010+
exit(basic_deliver_timeout)
1011+
end,
1012+
1013+
?awaitMatch({ok, {_, {value, {<<"ch2-ctag1">>, _}}}, _},
1014+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun]),
1015+
?DEFAULT_AWAIT),
1016+
ok.
1017+
1018+
single_active_consumer_priority(Config) ->
1019+
[Server0, Server1, Server2] =
1020+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1021+
1022+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
1023+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1024+
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server2),
1025+
QName = ?config(queue_name, Config),
1026+
Q1 = <<QName/binary, "_1">>,
1027+
Q2 = <<QName/binary, "_2">>,
1028+
Q3 = <<QName/binary, "_3">>,
1029+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
1030+
{<<"x-single-active-consumer">>, bool, true}],
1031+
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
1032+
?assertEqual({'queue.declare_ok', Q2, 0, 0}, declare(Ch2, Q2, Args)),
1033+
?assertEqual({'queue.declare_ok', Q3, 0, 0}, declare(Ch3, Q3, Args)),
1034+
1035+
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 3}]),
1036+
ok = subscribe(Ch1, Q2, false, <<"ch1-ctag2">>, [{"x-priority", byte, 2}]),
1037+
ok = subscribe(Ch1, Q3, false, <<"ch1-ctag3">>, [{"x-priority", byte, 1}]),
1038+
1039+
1040+
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 1}]),
1041+
ok = subscribe(Ch2, Q2, false, <<"ch2-ctag2">>, [{"x-priority", byte, 3}]),
1042+
ok = subscribe(Ch2, Q3, false, <<"ch2-ctag3">>, [{"x-priority", byte, 2}]),
1043+
1044+
ok = subscribe(Ch3, Q1, false, <<"ch3-ctag1">>, [{"x-priority", byte, 2}]),
1045+
ok = subscribe(Ch3, Q2, false, <<"ch3-ctag2">>, [{"x-priority", byte, 1}]),
1046+
ok = subscribe(Ch3, Q3, false, <<"ch3-ctag3">>, [{"x-priority", byte, 3}]),
1047+
1048+
1049+
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
1050+
RaNameQ2 = binary_to_atom(<<"%2F", "_", Q2/binary>>, utf8),
1051+
RaNameQ3 = binary_to_atom(<<"%2F", "_", Q3/binary>>, utf8),
1052+
%% assert each queue has a different consumer
1053+
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
1054+
1055+
%% Q1 should have the consumer on Ch1
1056+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
1057+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
1058+
1059+
%% Q2 Ch2
1060+
?assertMatch({ok, {_, {value, {<<"ch2-ctag2">>, _}}}, _},
1061+
rpc:call(Server1, ra, local_query, [RaNameQ2, QueryFun])),
1062+
1063+
%% Q3 Ch3
1064+
?assertMatch({ok, {_, {value, {<<"ch3-ctag3">>, _}}}, _},
1065+
rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])),
1066+
1067+
%% close Ch3
1068+
_ = rabbit_ct_client_helpers:close_channel(Ch3),
1069+
flush(100),
1070+
1071+
%% assert Q3 has Ch2 (priority 2) as consumer
1072+
?assertMatch({ok, {_, {value, {<<"ch2-ctag3">>, _}}}, _},
1073+
rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])),
1074+
1075+
%% close Ch2
1076+
_ = rabbit_ct_client_helpers:close_channel(Ch2),
1077+
flush(100),
1078+
1079+
%% assert all queues as has Ch1 as consumer
1080+
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
1081+
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
1082+
?assertMatch({ok, {_, {value, {<<"ch1-ctag2">>, _}}}, _},
1083+
rpc:call(Server0, ra, local_query, [RaNameQ2, QueryFun])),
1084+
?assertMatch({ok, {_, {value, {<<"ch1-ctag3">>, _}}}, _},
1085+
rpc:call(Server0, ra, local_query, [RaNameQ3, QueryFun])),
1086+
ok.
1087+
9741088
reject_after_leader_transfer(Config) ->
9751089
[Server0, Server1, Server2] =
9761090
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -3623,13 +3737,20 @@ consume_empty(Ch, Queue, NoAck) ->
36233737
no_ack = NoAck})).
36243738

36253739
subscribe(Ch, Queue, NoAck) ->
3740+
subscribe(Ch, Queue, NoAck, <<"ctag">>, []).
3741+
3742+
subscribe(Ch, Queue, NoAck, Tag, Args) ->
36263743
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
36273744
no_ack = NoAck,
3628-
consumer_tag = <<"ctag">>},
3745+
arguments = Args,
3746+
consumer_tag = Tag},
36293747
self()),
36303748
receive
3631-
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
3749+
#'basic.consume_ok'{consumer_tag = Tag} ->
36323750
ok
3751+
after 30000 ->
3752+
flush(100),
3753+
exit(subscribe_timeout)
36333754
end.
36343755

36353756
qos(Ch, Prefetch, Global) ->

0 commit comments

Comments
 (0)