@@ -813,11 +813,9 @@ purge_node(Meta, Node, State, Effects) ->
813
813
end , {State , Effects }, all_pids_for (Node , State )).
814
814
815
815
% % any downs that re not noconnection
816
- handle_down (Meta , Pid , #? STATE {consumers = Cons0 ,
817
- enqueuers = Enqs0 } = State0 ) ->
818
- % Remove any enqueuer for the down pid
819
- State1 = State0 #? STATE {enqueuers = maps :remove (Pid , Enqs0 )},
820
- {Effects1 , State2 } = handle_waiting_consumer_down (Pid , State1 ),
816
+ handle_down (Meta , Pid , #? MODULE {consumers = Cons0 } = State0 ) ->
817
+ {Effects0 , State1 } = handle_enqueuer_down (Meta , Pid , State0 ),
818
+ {Effects1 , State2 } = handle_waiting_consumer_down (Pid , State1 , Effects0 ),
821
819
% return checked out messages to main queue
822
820
% Find the consumers for the down pid
823
821
DownConsumers = maps :keys (
@@ -826,6 +824,23 @@ handle_down(Meta, Pid, #?STATE{consumers = Cons0,
826
824
cancel_consumer (Meta , ConsumerId , S , E , down )
827
825
end , {State2 , Effects1 }, DownConsumers ).
828
826
827
+ handle_enqueuer_down (#{index := Idx }, Pid ,
828
+ #? MODULE {enqueuers = Enqs0 } = State0 ) ->
829
+ case maps :take (Pid , Enqs0 ) of
830
+ {_Enqueuer , Enqs } ->
831
+ State = State0 #? MODULE {enqueuers = Enqs },
832
+ % % When there are no more enqueuers connected, suggest a checkpoint
833
+ % % so that recovery is fast.
834
+ case Enqs =:= #{} of
835
+ true ->
836
+ {[{checkpoint , Idx , State }], State };
837
+ false ->
838
+ {[], State }
839
+ end ;
840
+ error ->
841
+ {[], State0 }
842
+ end .
843
+
829
844
consumer_active_flag_update_function (
830
845
#? STATE {cfg = # cfg {consumer_strategy = competing }}) ->
831
846
fun (State , ConsumerId , Consumer , Active , ActivityStatus , Effects ) ->
@@ -839,22 +854,25 @@ consumer_active_flag_update_function(
839
854
end .
840
855
841
856
handle_waiting_consumer_down (_Pid ,
842
- #? STATE {cfg = # cfg {consumer_strategy = competing }} = State ) ->
843
- {[], State };
857
+ #? MODULE {cfg = # cfg {consumer_strategy = competing }} = State ,
858
+ Effects0 ) ->
859
+ {Effects0 , State };
844
860
handle_waiting_consumer_down (_Pid ,
845
- #? STATE {cfg = # cfg {consumer_strategy = single_active },
846
- waiting_consumers = []} = State ) ->
847
- {[], State };
861
+ #? MODULE {cfg = # cfg {consumer_strategy = single_active },
862
+ waiting_consumers = []} = State ,
863
+ Effects0 ) ->
864
+ {Effects0 , State };
848
865
handle_waiting_consumer_down (Pid ,
849
- #? STATE {cfg = # cfg {consumer_strategy = single_active },
850
- waiting_consumers = WaitingConsumers0 } = State0 ) ->
866
+ #? MODULE {cfg = # cfg {consumer_strategy = single_active },
867
+ waiting_consumers = WaitingConsumers0 } = State0 ,
868
+ Effects0 ) ->
851
869
% get cancel effects for down waiting consumers
852
870
Down = lists :filter (fun ({{_ , P }, _ }) -> P =:= Pid end ,
853
871
WaitingConsumers0 ),
854
872
Effects = lists :foldl (fun ({ConsumerId , _ }, Effects ) ->
855
873
cancel_consumer_effects (ConsumerId , State0 ,
856
874
Effects )
857
- end , [] , Down ),
875
+ end , Effects0 , Down ),
858
876
% update state to have only up waiting consumers
859
877
StillUp = lists :filter (fun ({{_ , P }, _ }) -> P =/= Pid end ,
860
878
WaitingConsumers0 ),
0 commit comments