@@ -448,7 +448,7 @@ apply(#{index := RaftIdx}, #purge{},
448448 lists :reverse ([garbage_collection | Effects ])};
449449apply (_ , {down , ConsumerPid , noconnection },
450450 # state {consumers = Cons0 ,
451- enqueuers = Enqs0 } = State0 ) ->
451+ enqueuers = Enqs0 } = State0 ) ->
452452 Node = node (ConsumerPid ),
453453 % mark all consumers and enqueuers as suspected down
454454 % and monitor the node so that we can find out the final state of the
@@ -472,16 +472,19 @@ apply(_, {down, ConsumerPid, noconnection},
472472 E # enqueuer {suspected_down = true };
473473 (_ , E ) -> E
474474 end , Enqs0 ),
475+ % mark waiting consumers as suspected if necessary
476+ WaitingConsumers1 = maybe_mark_suspect_waiting_consumers (Node , State0 , true ),
477+
475478 Effects = case maps :size (Cons ) of
476479 0 ->
477480 [{aux , inactive }, {monitor , node , Node }];
478481 _ ->
479482 [{monitor , node , Node }]
480483 end ,
481484 % % TODO: should we run a checkout here?
482- {State # state {consumers = Cons , enqueuers = Enqs }, ok , Effects };
483- apply (_ , {down , Pid , _Info }, # state {consumers = Cons0 ,
484- enqueuers = Enqs0 } = State0 ) ->
485+ {State # state {consumers = Cons , enqueuers = Enqs , waiting_consumers = WaitingConsumers1 }, ok , Effects };
486+ apply (_ , {down , Pid , _Info }, # state {consumers = Cons0 ,
487+ enqueuers = Enqs0 } = State0 ) ->
485488 % Remove any enqueuer for the same pid and enqueue any pending messages
486489 % This should be ok as we won't see any more enqueues from this pid
487490 State1 = case maps :take (Pid , Enqs0 ) of
@@ -492,16 +495,18 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
492495 error ->
493496 State0
494497 end ,
498+ {Effects1 , State2 } = maybe_deal_with_waiting_consumers_when_channel_goes_down (Pid , State1 ),
495499 % return checked out messages to main queue
496500 % Find the consumers for the down pid
497501 DownConsumers = maps :keys (
498502 maps :filter (fun ({_ , P }, _ ) -> P =:= Pid end , Cons0 )),
499- {Effects1 , State2 } = lists :foldl (fun cancel_consumer /2 , {[], State1 },
503+ {Effects2 , State3 } = lists :foldl (fun cancel_consumer /2 , {Effects1 , State2 },
500504 DownConsumers ),
501- checkout (State2 , Effects1 );
502- apply (_ , {nodeup , Node }, # state {consumers = Cons0 ,
503- enqueuers = Enqs0 ,
504- service_queue = SQ0 } = State0 ) ->
505+ checkout (State3 , Effects2 );
506+ apply (_ , {nodeup , Node }, # state {consumers = Cons0 ,
507+ enqueuers = Enqs0 ,
508+ service_queue = SQ0 ,
509+ waiting_consumers = WaitingConsumers0 } = State0 ) ->
505510 % % A node we are monitoring has come back.
506511 % % If we have suspected any processes of being
507512 % % down we should now re-issue the monitors for them to detect if they're
@@ -516,7 +521,17 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
516521 [P | Acc ];
517522 (_ , _ , Acc ) -> Acc
518523 end , [], Enqs0 ),
519- Monitors = [{monitor , process , P } || P <- Cons ++ Enqs ],
524+ WaitingConsumers = lists :foldl (fun ({{_ , P }, # consumer {suspected_down = true }}, Acc )
525+ when node (P ) =:= Node ->
526+ [P | Acc ];
527+ (_ , Acc ) -> Acc
528+ end , [], WaitingConsumers0 ),
529+
530+ Monitors = [{monitor , process , P } || P <- Cons ++ Enqs ++ WaitingConsumers ],
531+
532+ % un-suspect waiting consumers when necessary
533+ WaitingConsumers1 = maybe_mark_suspect_waiting_consumers (Node , State0 , false ),
534+
520535 Enqs1 = maps :map (fun (P , E ) when node (P ) =:= Node ->
521536 E # enqueuer {suspected_down = false };
522537 (_ , E ) -> E
@@ -532,12 +547,45 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
532547 end , {Cons0 , SQ0 , Monitors }, Cons0 ),
533548 % TODO: avoid list concat
534549 checkout (State0 # state {consumers = Cons1 , enqueuers = Enqs1 ,
535- service_queue = SQ }, Effects );
550+ service_queue = SQ , waiting_consumers = WaitingConsumers1 }, Effects );
536551apply (_ , {nodedown , _Node }, State ) ->
537552 {State , ok };
538553apply (_ , # update_config {config = Conf }, State ) ->
539554 {update_config (Conf , State ), ok }.
540555
556+ maybe_deal_with_waiting_consumers_when_channel_goes_down (_Pid , # state {consumer_strategy = default } = State ) ->
557+ {[], State };
558+ maybe_deal_with_waiting_consumers_when_channel_goes_down (_Pid , # state {consumer_strategy = single_active ,
559+ waiting_consumers = []} = State ) ->
560+ {[], State };
561+ maybe_deal_with_waiting_consumers_when_channel_goes_down (Pid , # state {consumer_strategy = single_active ,
562+ waiting_consumers = WaitingConsumers0 } = State0 ) ->
563+ % get cancel effects for down waiting consumers
564+ DownWaitingConsumers = lists :filter (fun ({{_ , P }, _ }) -> P =:= Pid end , WaitingConsumers0 ),
565+ Effects1 = lists :foldl (fun ({ConsumerId , _ }, Effects ) ->
566+ cancel_consumer_effects (ConsumerId , State0 , Effects )
567+ end , [], DownWaitingConsumers ),
568+ % update state to have only up waiting consumers
569+ WaitingConsumersStillUp = lists :filter (fun ({{_ , P }, _ }) -> P =/= Pid end , WaitingConsumers0 ),
570+ State2 = State0 # state {waiting_consumers = WaitingConsumersStillUp },
571+ {Effects1 , State2 }.
572+
573+ maybe_mark_suspect_waiting_consumers (_Node , # state {consumer_strategy = default }, _Suspected ) ->
574+ [];
575+ maybe_mark_suspect_waiting_consumers (_Node , # state {consumer_strategy = single_active ,
576+ waiting_consumers = []}, _Suspected ) ->
577+ [];
578+ maybe_mark_suspect_waiting_consumers (Node , # state {consumer_strategy = single_active ,
579+ waiting_consumers = WaitingConsumers }, Suspected ) ->
580+ [begin
581+ case node (P ) of
582+ Node ->
583+ {ConsumerId , Consumer # consumer {suspected_down = Suspected }};
584+ _ ->
585+ {ConsumerId , Consumer }
586+ end
587+ end || {{_ , P } = ConsumerId , Consumer } <- WaitingConsumers ].
588+
541589- spec state_enter (ra_server :ra_state (), state ()) -> ra_machine :effects ().
542590state_enter (leader , # state {consumers = Cons ,
543591 enqueuers = Enqs ,
@@ -652,17 +700,27 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
652700query_ra_indexes (# state {ra_indexes = RaIndexes }) ->
653701 RaIndexes .
654702
655- query_consumer_count (# state {consumers = Consumers }) ->
656- maps :size (Consumers ).
657-
658- query_consumers (# state {consumers = Consumers }) ->
659- maps :map (fun ({Tag , Pid }, # consumer {meta = Meta }) ->
660- {Pid , Tag ,
661- maps :get (ack , Meta , undefined ),
662- maps :get (prefetch , Meta , undefined ),
663- maps :get (args , Meta , []),
664- maps :get (username , Meta , undefined )}
665- end , Consumers ).
703+ query_consumer_count (# state {consumers = Consumers , waiting_consumers = WaitingConsumers }) ->
704+ maps :size (Consumers ) + length (WaitingConsumers ).
705+
706+ query_consumers (# state {consumers = Consumers , waiting_consumers = WaitingConsumers }) ->
707+ FromConsumers = maps :map (fun ({Tag , Pid }, # consumer {meta = Meta }) ->
708+ {Pid , Tag ,
709+ maps :get (ack , Meta , undefined ),
710+ maps :get (prefetch , Meta , undefined ),
711+ maps :get (args , Meta , []),
712+ maps :get (username , Meta , undefined )}
713+ end , Consumers ),
714+ FromWaitingConsumers = lists :foldl (fun ({{Tag , Pid }, # consumer {meta = Meta }}, Acc ) ->
715+ maps :put ({Tag , Pid },
716+ {Pid , Tag ,
717+ maps :get (ack , Meta , undefined ),
718+ maps :get (prefetch , Meta , undefined ),
719+ maps :get (args , Meta , []),
720+ maps :get (username , Meta , undefined )},
721+ Acc )
722+ end , #{}, WaitingConsumers ),
723+ maps :merge (FromConsumers , FromWaitingConsumers ).
666724% % other
667725
668726- spec usage (atom ()) -> float ().
@@ -739,8 +797,9 @@ cancel_consumer(ConsumerId,
739797 % The cancelled consumer is not the active one
740798 % Just remove it from idle_consumers
741799 {value , _Consumer , WaitingConsumers1 } = lists :keytake (ConsumerId , 1 , WaitingConsumers0 ),
800+ Effects = cancel_consumer_effects (ConsumerId , State0 , Effects0 ),
742801 % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
743- {Effects0 , State0 # state {waiting_consumers = WaitingConsumers1 }}
802+ {Effects , State0 # state {waiting_consumers = WaitingConsumers1 }}
744803 end .
745804
746805cancel_consumer0 (ConsumerId ,
@@ -1993,41 +2052,167 @@ single_active_consumer_test() ->
19932052 ? assertNotEqual (false , lists :keyfind ({<<" ctag4" >>, self ()}, 1 , State1 # state .waiting_consumers )),
19942053
19952054 % cancelling a waiting consumer
1996- {State2 , _ , _ } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag3" >>, self ()}}, State1 ),
2055+ {State2 , _ , Effects1 } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag3" >>, self ()}}, State1 ),
19972056 % the active consumer should still be in place
19982057 ? assertEqual (1 , map_size (State2 # state .consumers )),
19992058 ? assert (maps :is_key ({<<" ctag1" >>, self ()}, State2 # state .consumers )),
20002059 % the cancelled consumer has been removed from waiting consumers
20012060 ? assertEqual (2 , length (State2 # state .waiting_consumers )),
20022061 ? assertNotEqual (false , lists :keyfind ({<<" ctag2" >>, self ()}, 1 , State2 # state .waiting_consumers )),
20032062 ? assertNotEqual (false , lists :keyfind ({<<" ctag4" >>, self ()}, 1 , State2 # state .waiting_consumers )),
2063+ % there are some effects to unregister the consumer
2064+ ? assertEqual (1 , length (Effects1 )),
20042065
20052066 % cancelling the active consumer
2006- {State3 , _ , _ } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag1" >>, self ()}}, State2 ),
2067+ {State3 , _ , Effects2 } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag1" >>, self ()}}, State2 ),
20072068 % the second registered consumer is now the active one
20082069 ? assertEqual (1 , map_size (State3 # state .consumers )),
20092070 ? assert (maps :is_key ({<<" ctag2" >>, self ()}, State3 # state .consumers )),
20102071 % the new active consumer is no longer in the waiting list
20112072 ? assertEqual (1 , length (State3 # state .waiting_consumers )),
20122073 ? assertNotEqual (false , lists :keyfind ({<<" ctag4" >>, self ()}, 1 , State3 # state .waiting_consumers )),
2074+ % there are some effects to unregister the consumer
2075+ ? assertEqual (1 , length (Effects2 )),
20132076
20142077 % cancelling the active consumer
2015- {State4 , _ , _ } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag2" >>, self ()}}, State3 ),
2078+ {State4 , _ , Effects3 } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag2" >>, self ()}}, State3 ),
20162079 % the last waiting consumer became the active one
20172080 ? assertEqual (1 , map_size (State4 # state .consumers )),
20182081 ? assert (maps :is_key ({<<" ctag4" >>, self ()}, State4 # state .consumers )),
20192082 % the waiting consumer list is now empty
20202083 ? assertEqual (0 , length (State4 # state .waiting_consumers )),
2084+ % there are some effects to unregister the consumer
2085+ ? assertEqual (1 , length (Effects3 )),
20212086
20222087 % cancelling the last consumer
2023- {State5 , _ , _ } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag4" >>, self ()}}, State4 ),
2088+ {State5 , _ , Effects4 } = apply (#{}, # checkout {spec = cancel , consumer_id = {<<" ctag4" >>, self ()}}, State4 ),
20242089 % no active consumer anymore
20252090 ? assertEqual (0 , map_size (State5 # state .consumers )),
20262091 % still nothing in the waiting list
20272092 ? assertEqual (0 , length (State5 # state .waiting_consumers )),
2093+ % there is an effect to unregister the consumer + queue inactive effect
2094+ ? assertEqual (1 + 1 , length (Effects4 )),
20282095
20292096 ok .
20302097
2098+ single_active_consumer_cancel_consumer_when_channel_is_down_test () ->
2099+ State0 = init (#{name => ? FUNCTION_NAME ,
2100+ queue_resource => rabbit_misc :r (" /" , queue ,
2101+ atom_to_binary (? FUNCTION_NAME , utf8 )),
2102+ shadow_copy_interval => 0 ,
2103+ single_active_consumer_on => true }),
2104+
2105+ DummyFunction = fun () -> ok end ,
2106+ Pid1 = spawn (DummyFunction ),
2107+ Pid2 = spawn (DummyFunction ),
2108+ Pid3 = spawn (DummyFunction ),
2109+
2110+ % adding some consumers
2111+ AddConsumer = fun ({CTag , ChannelId }, State ) ->
2112+ {NewState , _ , _ } = apply (
2113+ #{},
2114+ # checkout {spec = {once , 1 , simple_prefetch },
2115+ meta = #{},
2116+ consumer_id = {CTag , ChannelId }},
2117+ State ),
2118+ NewState
2119+ end ,
2120+ State1 = lists :foldl (AddConsumer , State0 ,
2121+ [{<<" ctag1" >>, Pid1 }, {<<" ctag2" >>, Pid2 }, {<<" ctag3" >>, Pid2 }, {<<" ctag4" >>, Pid3 }]),
2122+
2123+ % the channel of the active consumer goes down
2124+ {State2 , _ , Effects } = apply (#{}, {down , Pid1 , doesnotmatter }, State1 ),
2125+ % fell back to another consumer
2126+ ? assertEqual (1 , map_size (State2 # state .consumers )),
2127+ % there are still waiting consumers
2128+ ? assertEqual (2 , length (State2 # state .waiting_consumers )),
2129+ % the effect to unregister the consumer is there
2130+ ? assertEqual (1 , length (Effects )),
2131+
2132+ % the channel of the active consumer and a waiting consumer goes down
2133+ {State3 , _ , Effects2 } = apply (#{}, {down , Pid2 , doesnotmatter }, State2 ),
2134+ % fell back to another consumer
2135+ ? assertEqual (1 , map_size (State3 # state .consumers )),
2136+ % no more waiting consumer
2137+ ? assertEqual (0 , length (State3 # state .waiting_consumers )),
2138+ % effects to cancel both consumers of this channel
2139+ ? assertEqual (2 , length (Effects2 )),
2140+
2141+ % the last channel goes down
2142+ {State4 , _ , Effects3 } = apply (#{}, {down , Pid3 , doesnotmatter }, State3 ),
2143+ % no more consumers
2144+ ? assertEqual (0 , map_size (State4 # state .consumers )),
2145+ ? assertEqual (0 , length (State4 # state .waiting_consumers )),
2146+ % there is an effect to unregister the consumer + queue inactive effect
2147+ ? assertEqual (1 + 1 , length (Effects3 )),
2148+
2149+ ok .
2150+
2151+ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test () ->
2152+ State0 = init (#{name => ? FUNCTION_NAME ,
2153+ queue_resource => rabbit_misc :r (" /" , queue ,
2154+ atom_to_binary (? FUNCTION_NAME , utf8 )),
2155+ shadow_copy_interval => 0 ,
2156+ single_active_consumer_on => true }),
2157+
2158+ % adding some consumers
2159+ AddConsumer = fun (CTag , State ) ->
2160+ {NewState , _ , _ } = apply (
2161+ #{},
2162+ # checkout {spec = {once , 1 , simple_prefetch },
2163+ meta = #{},
2164+ consumer_id = {CTag , self ()}},
2165+ State ),
2166+ NewState
2167+ end ,
2168+ State1 = lists :foldl (AddConsumer , State0 , [<<" ctag1" >>, <<" ctag2" >>, <<" ctag3" >>, <<" ctag4" >>]),
2169+
2170+ % simulate node goes down
2171+ {State2 , _ , _ } = apply (#{}, {down , self (), noconnection }, State1 ),
2172+
2173+ % all the waiting consumers should be suspected down
2174+ ? assertEqual (3 , length (State2 # state .waiting_consumers )),
2175+ lists :foreach (fun ({_ , # consumer {suspected_down = SuspectedDown }}) ->
2176+ ? assert (SuspectedDown )
2177+ end , State2 # state .waiting_consumers ),
2178+
2179+ % simulate node goes back up
2180+ {State3 , _ , _ } = apply (#{}, {nodeup , node (self ())}, State2 ),
2181+
2182+ % all the waiting consumers should be un-suspected
2183+ ? assertEqual (3 , length (State3 # state .waiting_consumers )),
2184+ lists :foreach (fun ({_ , # consumer {suspected_down = SuspectedDown }}) ->
2185+ ? assertNot (SuspectedDown )
2186+ end , State3 # state .waiting_consumers ),
2187+
2188+ ok .
2189+
2190+ query_consumers_test () ->
2191+ State0 = init (#{name => ? FUNCTION_NAME ,
2192+ queue_resource => rabbit_misc :r (" /" , queue ,
2193+ atom_to_binary (? FUNCTION_NAME , utf8 )),
2194+ shadow_copy_interval => 0 ,
2195+ single_active_consumer_on => true }),
2196+
2197+ % adding some consumers
2198+ AddConsumer = fun (CTag , State ) ->
2199+ {NewState , _ , _ } = apply (
2200+ #{},
2201+ # checkout {spec = {once , 1 , simple_prefetch },
2202+ meta = #{},
2203+ consumer_id = {CTag , self ()}},
2204+ State ),
2205+ NewState
2206+ end ,
2207+ State1 = lists :foldl (AddConsumer , State0 , [<<" ctag1" >>, <<" ctag2" >>, <<" ctag3" >>, <<" ctag4" >>]),
2208+
2209+ ? assertEqual (4 , query_consumer_count (State1 )),
2210+ Consumers = query_consumers (State1 ),
2211+ ? assertEqual (4 , maps :size (Consumers )),
2212+ maps :fold (fun ({_Tag , Pid }, {Pid , _Tag , _ , _ , _ , _ }, _Acc ) ->
2213+ ? assertEqual (self (), Pid )
2214+ end , [], Consumers ).
2215+
20312216meta (Idx ) ->
20322217 #{index => Idx , term => 1 }.
20332218
0 commit comments