@@ -413,26 +413,20 @@ apply(_, #checkout{spec = Spec, meta = Meta,
413413 State1 = update_consumer (ConsumerId , Meta , Spec , State0 ),
414414 checkout (State1 , [{monitor , process , Pid }]);
415415apply (#{index := RaftIdx }, # purge {},
416- # state {consumers = Cons0 , ra_indexes = Indexes } = State0 ) ->
417- Total = rabbit_fifo_index :size (Indexes ),
418- {State1 , Effects1 } =
419- maps :fold (
420- fun (ConsumerId , C = # consumer {checked_out = Checked0 },
421- {StateAcc0 , EffectsAcc0 }) ->
422- MsgRaftIdxs = [RIdx || {_MsgInId , {RIdx , _ }}
423- <- maps :values (Checked0 )],
424- complete (ConsumerId , MsgRaftIdxs , maps :size (Checked0 ), C ,
425- #{}, EffectsAcc0 , StateAcc0 )
426- end , {State0 , []}, Cons0 ),
427- {State , _ , Effects } =
428- update_smallest_raft_index (
429- RaftIdx , Indexes ,
430- State1 # state {ra_indexes = rabbit_fifo_index :empty (),
431- messages = #{},
432- returns = lqueue :new (),
433- msg_bytes_enqueue = 0 ,
434- msg_bytes_checkout = 0 ,
435- low_msg_num = undefined }, Effects1 ),
416+ # state {ra_indexes = Indexes0 ,
417+ messages = Messages } = State0 ) ->
418+ Total = maps :size (Messages ),
419+ Indexes = lists :foldl (fun rabbit_fifo_index :delete /2 ,
420+ Indexes0 ,
421+ [I || {I , _ } <- lists :sort (maps :values (Messages ))]),
422+ {State , _ , Effects } =
423+ update_smallest_raft_index (RaftIdx , Indexes0 ,
424+ State0 # state {ra_indexes = Indexes ,
425+ messages = #{},
426+ returns = lqueue :new (),
427+ msg_bytes_enqueue = 0 ,
428+ low_msg_num = undefined },
429+ []),
436430 % % as we're not checking out after a purge (no point) we have to
437431 % % reverse the effects ourselves
438432 {State , {purge , Total },
@@ -1876,11 +1870,12 @@ purge_with_checkout_test() ->
18761870 % % assert message bytes are non zero
18771871 ? assert (State2 # state .msg_bytes_checkout > 0 ),
18781872 ? assert (State2 # state .msg_bytes_enqueue > 0 ),
1879- {State3 , {purge , 2 }, _ } = apply (meta (2 ), make_purge (), State2 ),
1880- ? assertEqual ( 0 , State3 # state .msg_bytes_checkout ),
1873+ {State3 , {purge , 1 }, _ } = apply (meta (2 ), make_purge (), State2 ),
1874+ ? assert ( State2 # state .msg_bytes_checkout > 0 ),
18811875 ? assertEqual (0 , State3 # state .msg_bytes_enqueue ),
1876+ ? assertEqual (1 , rabbit_fifo_index :size (State3 # state .ra_indexes )),
18821877 # consumer {checked_out = Checked } = maps :get (Cid , State3 # state .consumers ),
1883- ? assertEqual (0 , maps :size (Checked )),
1878+ ? assertEqual (1 , maps :size (Checked )),
18841879 ok .
18851880
18861881down_returns_checked_out_in_order_test () ->
0 commit comments