184184         % % command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
185185         credit_mode  =  simple_prefetch  ::  credit_mode (), %  part of snapshot data
186186         lifetime  =  once  ::  once  | auto ,
187-          suspected_down  =  false  ::  boolean ()
187+          suspected_down  =  false  ::  'cancel'  |  boolean ()
188188        }).
189189
190190- type  consumer () ::  # consumer {}.
@@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
426426            end 
427427    end ;
428428apply (Meta , # checkout {spec  =  cancel , consumer_id  =  ConsumerId }, State0 ) ->
429-     {State , Effects } =  cancel_consumer (ConsumerId , State0 , []),
430-     %  TODO: here we should really demonitor the pid but _only_ if it has no
431-     %  other consumers or enqueuers. leaving a monitor in place isn't harmful
432-     %  however
429+     {State , Effects } =  cancel_consumer (ConsumerId , State0 , [], consumer_cancel ),
433430    checkout (Meta , State , Effects );
434431apply (Meta , # checkout {spec  =  Spec , meta  =  ConsumerMeta ,
435432                      consumer_id  =  {_ , Pid } =  ConsumerId },
@@ -469,7 +466,8 @@ apply(_, {down, ConsumerPid, noconnection},
469466    {Cons , State , Effects1 } =  maps :fold (
470467                                  fun ({_ , P } =  K ,
471468                                      # consumer {checked_out  =  Checked0 } =  C ,
472-                                       {Co , St0 , Eff }) when  node (P ) =:=  Node  ->
469+                                       {Co , St0 , Eff }) when  (node (P ) =:=  Node ) and 
470+                                                            (C # consumer .suspected_down  =/=  cancel )->
473471                                          St  =  return_all (St0 , Checked0 ),
474472                                          Credit  =  increase_credit (C , maps :size (Checked0 )),
475473                                          Eff1  =  ConsumerUpdateActiveFun (St , K , C , false , suspected_down , Eff ),
@@ -514,7 +512,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
514512    DownConsumers  =  maps :keys (
515513                      maps :filter (fun ({_ , P }, _ ) -> P  =:=  Pid  end , Cons0 )),
516514    {State , Effects } =  lists :foldl (fun (ConsumerId , {S , E }) ->
517-                                            cancel_consumer (ConsumerId , S , E )
515+                                            cancel_consumer (ConsumerId , S , E ,  down )
518516                                   end , {State2 , Effects1 }, DownConsumers ),
519517    checkout (Meta , State , Effects );
520518apply (Meta , {nodeup , Node }, # state {consumers  =  Cons0 ,
@@ -538,7 +536,7 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
538536    ConsumerUpdateActiveFun  =  consumer_active_flag_update_function (State0 ),
539537    {Cons1 , SQ , Effects } = 
540538        maps :fold (fun ({_ , P } =  ConsumerId , C , {CAcc , SQAcc , EAcc })
541-                         when  node (P ) =:=  Node  ->
539+                         when  ( node (P ) =:=  Node )  and  ( C # consumer . suspected_down   =/=   cancel )  ->
542540                          EAcc1  =  ConsumerUpdateActiveFun (State0 , ConsumerId , C , true , up , EAcc ),
543541                          update_or_remove_sub (
544542                            ConsumerId , C # consumer {suspected_down  =  false },
@@ -606,7 +604,8 @@ maybe_mark_suspect_waiting_consumers(Node,
606604             _  ->
607605                 {ConsumerId , Consumer }
608606         end 
609-      end  || {{_ , P } =  ConsumerId , Consumer } <-  WaitingConsumers ].
607+      end  || {{_ , P } =  ConsumerId , Consumer } <-  WaitingConsumers ,
608+             Consumer # consumer .suspected_down  =/=  cancel ].
610609
611610- spec  state_enter (ra_server :ra_state (), state ()) -> ra_machine :effects ().
612611state_enter (leader , # state {consumers  =  Cons ,
@@ -758,18 +757,24 @@ query_consumers(#state{consumers = Consumers,
758757                                          end 
759758                                      end 
760759                              end ,
761-     FromConsumers  =  maps :map (fun  ({Tag , Pid }, # consumer {meta  =  Meta } =  Consumer ) ->
762-                                     {Active , ActivityStatus } =  ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
763-                                     {Pid , Tag ,
764-                                       maps :get (ack , Meta , undefined ),
765-                                       maps :get (prefetch , Meta , undefined ),
766-                                       Active ,
767-                                       ActivityStatus ,
768-                                       maps :get (args , Meta , []),
769-                                       maps :get (username , Meta , undefined )}
770-                              end , Consumers ),
760+     FromConsumers  =  maps :fold (fun  (_ , # consumer {suspected_down  =  cancel }, Acc ) ->
761+                                       Acc ;
762+                                    ({Tag , Pid }, # consumer {meta  =  Meta } =  Consumer , Acc ) ->
763+                                       {Active , ActivityStatus } =  ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
764+                                       maps :put ({Tag , Pid },
765+                                                {Pid , Tag ,
766+                                                 maps :get (ack , Meta , undefined ),
767+                                                 maps :get (prefetch , Meta , undefined ),
768+                                                 Active ,
769+                                                 ActivityStatus ,
770+                                                 maps :get (args , Meta , []),
771+                                                 maps :get (username , Meta , undefined )},
772+                                                Acc )
773+                               end , #{}, Consumers ),
771774    FromWaitingConsumers  = 
772-         lists :foldl (fun ({{Tag , Pid }, # consumer {meta  =  Meta } =  Consumer }, Acc ) ->
775+         lists :foldl (fun  ({_ , # consumer {suspected_down  =  cancel }}, Acc ) ->
776+                                       Acc ;
777+                         ({{Tag , Pid }, # consumer {meta  =  Meta } =  Consumer }, Acc ) ->
773778                            {Active , ActivityStatus } =  ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
774779                            maps :put ({Tag , Pid },
775780                                     {Pid , Tag ,
@@ -854,42 +859,42 @@ num_checked_out(#state{consumers = Cons}) ->
854859                end , 0 , maps :values (Cons )).
855860
856861cancel_consumer (ConsumerId ,
857-                 # state {consumer_strategy  =  default } =  State , Effects ) ->
862+                 # state {consumer_strategy  =  default } =  State , Effects ,  Reason ) ->
858863    % % general case, single active consumer off
859-     cancel_consumer0 (ConsumerId , State , Effects );
864+     cancel_consumer0 (ConsumerId , State , Effects ,  Reason );
860865cancel_consumer (ConsumerId ,
861866                # state {consumer_strategy  =  single_active ,
862867                       waiting_consumers  =  []} =  State ,
863-                 Effects ) ->
868+                 Effects ,  Reason ) ->
864869    % % single active consumer on, no consumers are waiting
865-     cancel_consumer0 (ConsumerId , State , Effects );
870+     cancel_consumer0 (ConsumerId , State , Effects ,  Reason );
866871cancel_consumer (ConsumerId ,
867872                # state {consumers  =  Cons0 ,
868873                       consumer_strategy  =  single_active ,
869874                       waiting_consumers  =  WaitingConsumers0 } =  State0 ,
870-                Effects0 ) ->
875+                Effects0 ,  Reason ) ->
871876    % % single active consumer on, consumers are waiting
872877    case  maps :take (ConsumerId , Cons0 ) of 
873-         {# consumer { checked_out   =   Checked0 },  _ } ->
878+         {Consumer ,  Cons1 } ->
874879            %  The active consumer is to be removed
875880            %  Cancel it
876-             State1   =   return_all ( State0 , Checked0 ),
877-             Effects1  =  cancel_consumer_effects (ConsumerId , State1 , Effects0 ),
881+             { State1 ,  Effects1 }  =   maybe_return_all ( ConsumerId ,  Consumer ,  Cons1 ,  State0 , Effects0 ,  Reason ),
882+             Effects2  =  cancel_consumer_effects (ConsumerId , State1 , Effects1 ),
878883            %  Take another one from the waiting consumers and put it in consumers
879884            [{NewActiveConsumerId , NewActiveConsumer }
880885             | RemainingWaitingConsumers ] =  WaitingConsumers0 ,
881-             # state {service_queue  =  ServiceQueue } =  State0 ,
886+             # state {service_queue  =  ServiceQueue } =  State1 ,
882887            ServiceQueue1  =  maybe_queue_consumer (NewActiveConsumerId ,
883888                                                 NewActiveConsumer ,
884889                                                 ServiceQueue ),
885-             State  =  State1 # state {consumers  =  #{ NewActiveConsumerId   => 
886-                                                NewActiveConsumer } ,
890+             State  =  State1 # state {consumers  =  maps : put ( NewActiveConsumerId , 
891+                                                        NewActiveConsumer ,  State1 # state . consumers ) ,
887892                                 service_queue  =  ServiceQueue1 ,
888893                                 waiting_consumers  =  RemainingWaitingConsumers },
889-             Effects2  =  consumer_update_active_effects (State , NewActiveConsumerId ,
894+             Effects  =  consumer_update_active_effects (State , NewActiveConsumerId ,
890895                                                      NewActiveConsumer , true ,
891-                                                       single_active , Effects1 ),
892-             {State , Effects2 };
896+                                                       single_active , Effects2 ),
897+             {State , Effects };
893898        error  ->
894899            %  The cancelled consumer is not the active one
895900            %  Just remove it from idle_consumers
@@ -914,23 +919,39 @@ consumer_update_active_effects(#state{queue_resource = QName },
914919      [QName , ConsumerId , false , Ack , Prefetch , Active , ActivityStatus , Args ]}
915920      | Effects ].
916921
917- cancel_consumer0 (ConsumerId ,
918-                 # state {consumers  =  C0 } =  S0 , Effects0 ) ->
922+ cancel_consumer0 (ConsumerId , # state {consumers  =  C0 } =  S0 , Effects0 , Reason ) ->
919923    case  maps :take (ConsumerId , C0 ) of 
920-         {# consumer { checked_out   =   Checked0 },  Cons } ->
921-             S   =   return_all ( S0 , Checked0 ),
922-             Effects  =  cancel_consumer_effects (ConsumerId , S , Effects0 ),
923-             case  maps :size (Cons ) of 
924+         {Consumer ,  Cons1 } ->
925+             { S ,  Effects2 }  =   maybe_return_all ( ConsumerId ,  Consumer ,  Cons1 ,  S0 , Effects0 ,  Reason ),
926+             Effects  =  cancel_consumer_effects (ConsumerId , S , Effects2 ),
927+             case  maps :size (S # state . consumers ) of 
924928                0  ->
925-                     {S # state { consumers   =   Cons } , [{aux , inactive } | Effects ]};
929+                     {S , [{aux , inactive } | Effects ]};
926930                _  ->
927-                     {S # state { consumers   =   Cons } , Effects }
931+                     {S , Effects }
928932            end ;
929933        error  ->
930934            % % already removed: do nothing
931935            {S0 , Effects0 }
932936    end .
933937
938+ maybe_return_all (ConsumerId , # consumer {checked_out  =  Checked0 } =  Consumer , Cons1 ,
939+                  # state {consumers  =  C0 ,
940+                         service_queue  =  SQ0 } =  S0 , Effects0 , Reason ) ->
941+     case  Reason  of 
942+         consumer_cancel  ->
943+             {Cons , SQ , Effects1 } = 
944+                 update_or_remove_sub (ConsumerId ,
945+                                      Consumer # consumer {lifetime  =  once ,
946+                                                        credit  =  0 ,
947+                                                        suspected_down  =  cancel },
948+                                      C0 , SQ0 , Effects0 ),
949+             {S0 # state {consumers  =  Cons , service_queue  =  SQ }, Effects1 };
950+         down  ->
951+             S1  =  return_all (S0 , Checked0 ),
952+             {S1 # state {consumers  =  Cons1 }, Effects0 }
953+     end .
954+ 
934955apply_enqueue (#{index  :=  RaftIdx } =  Meta , From , Seq , RawMsg , State0 ) ->
935956    Bytes  =  message_size (RawMsg ),
936957    case  maybe_enqueue (RaftIdx , From , Seq , RawMsg , [], State0 ) of 
@@ -1303,6 +1324,8 @@ checkout_one(#state{service_queue = SQ0,
13031324                            % % can happen when draining
13041325                            % % recurse without consumer on queue
13051326                            checkout_one (InitState # state {service_queue  =  SQ1 });
1327+                         {ok , # consumer {suspected_down  =  cancel }} ->
1328+                             checkout_one (InitState # state {service_queue  =  SQ1 });
13061329                        {ok , # consumer {suspected_down  =  true }} ->
13071330                            checkout_one (InitState # state {service_queue  =  SQ1 });
13081331                        {ok , # consumer {checked_out  =  Checked0 ,
0 commit comments