Skip to content

Commit 4492818

Browse files
committed
rabbit_fifo: cancel should not remove single active consumer
This change keeps a cancelled single active consumer in the consuemrs map but with the cancelled status allowing another consumer to take over as the active one. [#164135123]
1 parent 3b0adfd commit 4492818

File tree

3 files changed

+88
-24
lines changed

3 files changed

+88
-24
lines changed

src/rabbit_fifo.erl

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
784784
{Consumer, Cons1} ->
785785
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
786786
Effects0, Reason),
787+
%% The effects are emitted before the consumer is actually removed
788+
%% if the consumer has unacked messages. This is a bit weird but
789+
%% in line with what classic queues do (from an external point of
790+
%% view)
787791
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
788792
case maps:size(S#?MODULE.consumers) of
789793
0 ->
@@ -796,32 +800,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
796800
{S0, Effects0}
797801
end.
798802

799-
activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects)
800-
when map_size(Cons) == 1 ->
801-
{State0, Effects};
802-
activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0,
803+
activate_next_consumer(#?MODULE{consumers = Cons,
804+
waiting_consumers = Waiting0} = State0,
803805
Effects0) ->
804-
case lists:filter(fun ({_, #consumer{status = Status}}) ->
805-
Status == up
806-
end, Waiting0) of
807-
[{NextConsumerId, NextConsumer} | _] ->
808-
Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
809-
#?MODULE{service_queue = ServiceQueue} = State0,
810-
ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
811-
NextConsumer,
812-
ServiceQueue),
813-
State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer},
814-
service_queue = ServiceQueue1,
815-
waiting_consumers = Remaining},
816-
Effects = consumer_update_active_effects(State, NextConsumerId,
817-
NextConsumer, true,
818-
single_active, Effects0),
819-
{State, Effects};
820-
[] ->
821-
{State0, Effects0}
806+
case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
807+
Up when map_size(Up) == 0 ->
808+
%% there are no active consumer in the consumer map
809+
case lists:filter(fun ({_, #consumer{status = Status}}) ->
810+
Status == up
811+
end, Waiting0) of
812+
[{NextConsumerId, NextConsumer} | _] ->
813+
%% there is a potential next active consumer
814+
Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
815+
#?MODULE{service_queue = ServiceQueue} = State0,
816+
ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
817+
NextConsumer,
818+
ServiceQueue),
819+
State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
820+
service_queue = ServiceQueue1,
821+
waiting_consumers = Remaining},
822+
Effects = consumer_update_active_effects(State, NextConsumerId,
823+
NextConsumer, true,
824+
single_active, Effects0),
825+
{State, Effects};
826+
[] ->
827+
{State0, Effects0}
828+
end;
829+
_ ->
830+
{State0, Effects0}
822831
end.
823832

824833

834+
825835
maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
826836
Cons1, #?MODULE{consumers = C0,
827837
service_queue = SQ0} = S0,

src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act
190190
QName, Prefetch, Active, ActivityStatus, Args).
191191

192192
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
193-
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
193+
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
194+
[QName, ChPid, ConsumerTag]).
194195

195196
cancel_consumer(QName, ChPid, ConsumerTag) ->
196197
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),

test/rabbit_fifo_SUITE.erl

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1029,9 +1029,62 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
10291029

10301030
{_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
10311031
% for each consumer: 1 effect to monitor the consumer PID
1032-
ct:pal("Effects3 ~w", [Effects3]),
10331032
?assertEqual(5, length(Effects3)).
10341033

1034+
single_active_cancelled_with_unacked_test(_) ->
1035+
State0 = init(#{name => ?FUNCTION_NAME,
1036+
queue_resource => rabbit_misc:r("/", queue,
1037+
atom_to_binary(?FUNCTION_NAME, utf8)),
1038+
release_cursor_interval => 0,
1039+
single_active_consumer_on => true}),
1040+
1041+
C1 = {<<"ctag1">>, self()},
1042+
C2 = {<<"ctag2">>, self()},
1043+
% adding some consumers
1044+
AddConsumer = fun(C, S0) ->
1045+
{S, _, _} = apply(
1046+
meta(1),
1047+
make_checkout(C,
1048+
{auto, 1, simple_prefetch},
1049+
#{}),
1050+
S0),
1051+
S
1052+
end,
1053+
State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
1054+
1055+
%% enqueue 2 messages
1056+
{State2, _Effects2} = enq(3, 1, msg1, State1),
1057+
{State3, _Effects3} = enq(4, 2, msg2, State2),
1058+
%% one should be checked ou to C1
1059+
%% cancel C1
1060+
{State4, _, _} = apply(meta(5),
1061+
make_checkout(C1, cancel, #{}),
1062+
State3),
1063+
%% C2 should be the active consumer
1064+
?assertMatch(#{C2 := #consumer{status = up,
1065+
checked_out = #{0 := _}}},
1066+
State4#rabbit_fifo.consumers),
1067+
%% C1 should be a cancelled consumer
1068+
?assertMatch(#{C1 := #consumer{status = cancelled,
1069+
lifetime = once,
1070+
checked_out = #{0 := _}}},
1071+
State4#rabbit_fifo.consumers),
1072+
?assertMatch([], State4#rabbit_fifo.waiting_consumers),
1073+
1074+
%% Ack both messages
1075+
{State5, _Effects5} = settle(C1, 1, 0, State4),
1076+
%% C1 should now be cancelled
1077+
{State6, _Effects6} = settle(C2, 2, 0, State5),
1078+
1079+
%% C2 should remain
1080+
?assertMatch(#{C2 := #consumer{status = up}},
1081+
State6#rabbit_fifo.consumers),
1082+
%% C1 should be gone
1083+
?assertNotMatch(#{C1 := _},
1084+
State6#rabbit_fifo.consumers),
1085+
?assertMatch([], State6#rabbit_fifo.waiting_consumers),
1086+
ok.
1087+
10351088
meta(Idx) ->
10361089
#{index => Idx, term => 1}.
10371090

0 commit comments

Comments
 (0)