@@ -1400,10 +1400,10 @@ cancel_consumer0(Meta, ConsumerKey,
14001400activate_next_consumer ({State , Effects }) ->
14011401 activate_next_consumer (State , Effects ).
14021402
1403- activate_next_consumer (#? STATE {cfg = # cfg {consumer_strategy = competing }} = State0 ,
1404- Effects0 ) ->
1405- {State0 , Effects0 };
1406- activate_next_consumer (#? STATE {consumers = Cons ,
1403+ activate_next_consumer (#? STATE {cfg = # cfg {consumer_strategy = competing }} = State ,
1404+ Effects ) ->
1405+ {State , Effects };
1406+ activate_next_consumer (#? STATE {consumers = Cons0 ,
14071407 waiting_consumers = Waiting0 } = State0 ,
14081408 Effects0 ) ->
14091409 % % invariant, the waiting list always need to be sorted by consumers that are
@@ -1416,11 +1416,11 @@ activate_next_consumer(#?STATE{consumers = Cons,
14161416 undefined
14171417 end ,
14181418
1419- case {active_consumer (Cons ), NextConsumer } of
1419+ case {active_consumer (Cons0 ), NextConsumer } of
14201420 {undefined , {NextCKey , # consumer {cfg = NextCCfg } = NextC }} ->
14211421 Remaining = tl (Waiting0 ),
14221422 % % TODO: can this happen?
1423- Consumer = case maps :get (NextCKey , Cons , undefined ) of
1423+ Consumer = case maps :get (NextCKey , Cons0 , undefined ) of
14241424 undefined ->
14251425 NextC ;
14261426 Existing ->
@@ -1433,7 +1433,7 @@ activate_next_consumer(#?STATE{consumers = Cons,
14331433 ServiceQueue1 = maybe_queue_consumer (NextCKey ,
14341434 Consumer ,
14351435 ServiceQueue ),
1436- State = State0 #? STATE {consumers = Cons #{NextCKey => Consumer },
1436+ State = State0 #? STATE {consumers = Cons0 #{NextCKey => Consumer },
14371437 service_queue = ServiceQueue1 ,
14381438 waiting_consumers = Remaining },
14391439 Effects = consumer_update_active_effects (State , Consumer ,
@@ -1452,11 +1452,12 @@ activate_next_consumer(#?STATE{consumers = Cons,
14521452 ServiceQueue1 = maybe_queue_consumer (NextCKey ,
14531453 Consumer ,
14541454 ServiceQueue ),
1455- State = State0 #? STATE {consumers = maps :remove (ActiveCKey ,
1456- Cons #{NextCKey => Consumer }),
1455+ Cons1 = Cons0 #{NextCKey => Consumer },
1456+ Cons = maps :remove (ActiveCKey , Cons1 ),
1457+ Waiting = add_waiting ({ActiveCKey , Active }, Remaining ),
1458+ State = State0 #? STATE {consumers = Cons ,
14571459 service_queue = ServiceQueue1 ,
1458- waiting_consumers =
1459- add_waiting ({ActiveCKey , Active }, Remaining )},
1460+ waiting_consumers = Waiting },
14601461 Effects = consumer_update_active_effects (State , Consumer ,
14611462 true , single_active ,
14621463 Effects0 ),
@@ -1466,9 +1467,10 @@ activate_next_consumer(#?STATE{consumers = Cons,
14661467 when WaitingPriority > ActivePriority ->
14671468 % % A higher priority consumer has attached but the current one has
14681469 % % pending messages
1469- {State0 #? STATE {consumers =
1470- Cons #{ActiveCKey => Active # consumer {status = fading }}},
1471- Effects0 };
1470+ Cons = maps :update (ActiveCKey ,
1471+ Active # consumer {status = fading },
1472+ Cons0 ),
1473+ {State0 #? STATE {consumers = Cons }, Effects0 };
14721474 _ ->
14731475 % % no activation
14741476 {State0 , Effects0 }
@@ -1504,10 +1506,10 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
15041506 status = cancelled },
15051507 S0 ), Effects0 };
15061508 _ ->
1507- {S1 , Effects1 } = return_all (Meta , S0 , Effects0 , ConsumerKey , Consumer ),
1509+ {S1 , Effects } = return_all (Meta , S0 , Effects0 , ConsumerKey , Consumer ),
15081510 {S1 #? STATE {consumers = maps :remove (ConsumerKey , S1 #? STATE .consumers ),
15091511 last_active = Ts },
1510- Effects1 }
1512+ Effects }
15111513 end .
15121514
15131515apply_enqueue (#{index := RaftIdx ,
@@ -1743,8 +1745,7 @@ update_header(Key, UpdateFun, Default, ?TUPLE(Size, Expiry))
17431745 update_header (Key , UpdateFun , Default , #{size => Size ,
17441746 expiry => Expiry });
17451747update_header (Key , UpdateFun , Default , Header )
1746- when is_map (Header ) andalso
1747- is_map_key (size , Header ) ->
1748+ when is_map_key (size , Header ) ->
17481749 maps :update_with (Key , UpdateFun , Default , Header ).
17491750
17501751get_msg_header (? MSG (_Idx , Header )) ->
@@ -2172,24 +2173,24 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta,
21722173 {Life , Mode } = Spec , Priority ,
21732174 #? STATE {cfg = # cfg {consumer_strategy = single_active },
21742175 consumers = Cons0 ,
2175- waiting_consumers = Waiting ,
2176- service_queue = _ServiceQueue0 } = State0 ) ->
2176+ waiting_consumers = Waiting0 ,
2177+ service_queue = _ServiceQueue0 } = State ) ->
21772178 % % if it is the current active consumer, just update
21782179 % % if it is a cancelled active consumer, add to waiting unless it is the only
21792180 % % one, then merge
21802181 case active_consumer (Cons0 ) of
21812182 {ConsumerKey , # consumer {status = up } = Consumer0 } ->
21822183 Consumer = merge_consumer (Meta , Consumer0 , ConsumerMeta ,
21832184 Spec , Priority ),
2184- {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State0 )};
2185+ {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State )};
21852186 undefined when is_map_key (ConsumerKey , Cons0 ) ->
21862187 % % there is no active consumer and the current consumer is in the
21872188 % % consumers map and thus must be cancelled, in this case we can just
21882189 % % merge and effectively make this the current active one
21892190 Consumer0 = maps :get (ConsumerKey , Cons0 ),
21902191 Consumer = merge_consumer (Meta , Consumer0 , ConsumerMeta ,
21912192 Spec , Priority ),
2192- {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State0 )};
2193+ {Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State )};
21932194 _ ->
21942195 % % add as a new waiting consumer
21952196 Credit = included_credit (Mode ),
@@ -2202,9 +2203,8 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta,
22022203 credit_mode = Mode },
22032204 credit = Credit ,
22042205 delivery_count = DeliveryCount },
2205- {Consumer ,
2206- State0 #? STATE {waiting_consumers =
2207- add_waiting ({ConsumerKey , Consumer }, Waiting )}}
2206+ Waiting = add_waiting ({ConsumerKey , Consumer }, Waiting0 ),
2207+ {Consumer , State #? STATE {waiting_consumers = Waiting }}
22082208 end .
22092209
22102210add_waiting ({Key , _ } = New , Waiting ) ->
@@ -2496,10 +2496,8 @@ message_size(Msg) ->
24962496 false ->
24972497 % % probably only hit this for testing so ok to use erts_debug
24982498 {0 , erts_debug :size (Msg )}
2499-
25002499 end .
25012500
2502-
25032501all_nodes (#? STATE {consumers = Cons0 ,
25042502 enqueuers = Enqs0 ,
25052503 waiting_consumers = WaitingConsumers0 }) ->
@@ -2577,9 +2575,10 @@ get_priority(#{priority := Priority}) ->
25772575get_priority (#{args := Args }) ->
25782576 % % fallback, v3 option
25792577 case rabbit_misc :table_lookup (Args , <<" x-priority" >>) of
2580- {_Key , Value } ->
2578+ {_Type , Value } ->
25812579 Value ;
2582- _ -> 0
2580+ _ ->
2581+ 0
25832582 end ;
25842583get_priority (_ ) ->
25852584 0 .
@@ -2713,7 +2712,6 @@ consumer_key_from_id(ConsumerId, {_, _, I}) ->
27132712consumer_key_from_id (_ConsumerId , none ) ->
27142713 error .
27152714
2716-
27172715consumer_cancel_info (ConsumerKey , #? STATE {consumers = Consumers }) ->
27182716 case Consumers of
27192717 #{ConsumerKey := # consumer {checked_out = Checked }} ->
0 commit comments