2222-compile (inline ).
2323
2424-include_lib (" ra/include/ra.hrl" ).
25+ -include_lib (" rabbit_common/include/rabbit.hrl" ).
2526
2627-export ([
2728 init /1 ,
4243 query_consumers /1 ,
4344 usage /1 ,
4445
46+ zero /1 ,
47+
4548 % % misc
4649 dehydrate_state /1 ,
4750
226229 % % This is done so that consumers are still served in a deterministic
227230 % % order on recovery.
228231 prefix_msg_counts = {0 , 0 } :: {Return :: non_neg_integer (),
229- PrefixMsgs :: non_neg_integer ()}
232+ PrefixMsgs :: non_neg_integer ()},
233+ msg_bytes_enqueue = 0 :: non_neg_integer (),
234+ msg_bytes_checkout = 0 :: non_neg_integer ()
230235 }).
231236
232237- opaque state () :: # state {}.
@@ -266,6 +271,9 @@ update_state(Conf, State) ->
266271 become_leader_handler = BLH ,
267272 shadow_copy_interval = SHI }.
268273
274+ zero (_ ) ->
275+ 0 .
276+
269277% msg_ids are scoped per consumer
270278% ra_indexes holds all raft indexes for enqueues currently on queue
271279- spec apply (ra_machine :command_meta_data (), command (),
@@ -275,7 +283,8 @@ apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
275283 msg = RawMsg }, Effects0 , State00 ) ->
276284 case maybe_enqueue (RaftIdx , From , Seq , RawMsg , Effects0 , State00 ) of
277285 {ok , State0 , Effects1 } ->
278- {State , Effects , ok } = checkout (State0 , Effects1 ),
286+ {State , Effects , ok } = checkout (add_bytes_enqueue (RawMsg , State0 ),
287+ Effects1 ),
279288 {append_to_master_index (RaftIdx , State ), Effects , ok };
280289 {duplicate , State , Effects } ->
281290 {State , Effects , ok }
@@ -543,26 +552,35 @@ tick(_Ts, #state{name = Name,
543552 queue_resource = QName ,
544553 messages = Messages ,
545554 ra_indexes = Indexes ,
546- consumers = Cons } = State ) ->
555+ consumers = Cons ,
556+ msg_bytes_enqueue = EnqueueBytes ,
557+ msg_bytes_checkout = CheckoutBytes } = State ) ->
547558 Metrics = {Name ,
548559 maps :size (Messages ), % Ready
549560 num_checked_out (State ), % checked out
550561 rabbit_fifo_index :size (Indexes ), % % Total
551- maps :size (Cons )}, % Consumers
562+ maps :size (Cons ), % Consumers
563+ EnqueueBytes ,
564+ CheckoutBytes },
552565 [{mod_call , rabbit_quorum_queue ,
553566 update_metrics , [QName , Metrics ]}, {aux , emit }].
554567
555568- spec overview (state ()) -> map ().
556569overview (# state {consumers = Cons ,
557570 enqueuers = Enqs ,
558571 messages = Messages ,
559- ra_indexes = Indexes } = State ) ->
572+ ra_indexes = Indexes ,
573+ msg_bytes_enqueue = EnqueueBytes ,
574+ msg_bytes_checkout = CheckoutBytes
575+ } = State ) ->
560576 #{type => ? MODULE ,
561577 num_consumers => maps :size (Cons ),
562578 num_checked_out => num_checked_out (State ),
563579 num_enqueuers => maps :size (Enqs ),
564580 num_ready_messages => maps :size (Messages ),
565- num_messages => rabbit_fifo_index :size (Indexes )}.
581+ num_messages => rabbit_fifo_index :size (Indexes ),
582+ enqueue_message_bytes => EnqueueBytes ,
583+ checkout_message_bytes => CheckoutBytes }.
566584
567585- spec get_checked_out (consumer_id (), msg_id (), msg_id (), state ()) ->
568586 [delivery_msg ()].
@@ -806,12 +824,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
806824 Checked = maps :without (MsgIds , Checked0 ),
807825 Discarded = maps :with (MsgIds , Checked0 ),
808826 MsgRaftIdxs = [RIdx || {_ , {RIdx , _ }} <- maps :values (Discarded )],
827+ State1 = lists :foldl (fun ({_ , {_ , {_ , RawMsg }}}, Acc ) ->
828+ add_bytes_settle (RawMsg , Acc );
829+ (_ , Acc ) ->
830+ Acc
831+ end , State0 , maps :values (Discarded )),
809832 % % need to pass the length of discarded as $prefix_msgs would be filtered
810833 % % by the above list comprehension
811- {State1 , Effects1 , _ } = complete (ConsumerId , MsgRaftIdxs ,
834+ {State2 , Effects1 , _ } = complete (ConsumerId , MsgRaftIdxs ,
812835 maps :size (Discarded ),
813- Con0 , Checked , Effects0 , State0 ),
814- {State , Effects , _ } = checkout (State1 , Effects1 ),
836+ Con0 , Checked , Effects0 , State1 ),
837+ {State , Effects , _ } = checkout (State2 , Effects1 ),
815838 % settle metrics are incremented separately
816839 update_smallest_raft_index (IncomingRaftIdx , Indexes0 , State , Effects ).
817840
@@ -873,8 +896,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
873896 1 , Header0 ),
874897 Msg = {RaftId , {Header , RawMsg }},
875898 % this should not affect the release cursor in any way
876- State0 # state {messages = maps :put (MsgNum , Msg , Messages ),
877- returns = lqueue :in (MsgNum , Returns )}.
899+ add_bytes_return (RawMsg ,
900+ State0 # state {messages = maps :put (MsgNum , Msg , Messages ),
901+ returns = lqueue :in (MsgNum , Returns )}).
878902
879903return_all (State , Checked ) ->
880904 maps :fold (fun (_ , '$prefix_msg' , S ) ->
@@ -993,13 +1017,16 @@ checkout_one(#state{service_queue = SQ0,
9931017 {Cons , SQ , []} = % we expect no effects
9941018 update_or_remove_sub (ConsumerId , Con ,
9951019 Cons0 , SQ1 , []),
996- State = State0 # state {service_queue = SQ ,
997- messages = Messages ,
998- consumers = Cons },
999- Msg = case ConsumerMsg of
1000- '$prefix_msg' -> '$prefix_msg' ;
1001- {_ , {_ , M }} -> M
1002- end ,
1020+ State1 = State0 # state {service_queue = SQ ,
1021+ messages = Messages ,
1022+ consumers = Cons },
1023+ {State , Msg } =
1024+ case ConsumerMsg of
1025+ '$prefix_msg' ->
1026+ {State1 , '$prefix_msg' };
1027+ {_ , {_ , {_ , RawMsg } = M }} ->
1028+ {add_bytes_checkout (RawMsg , State1 ), M }
1029+ end ,
10031030 {success , ConsumerId , Next , Msg , State };
10041031 error ->
10051032 % % consumer did not exist but was queued, recurse
@@ -1147,6 +1174,35 @@ make_purge() -> #purge{}.
11471174make_update_state (Config ) ->
11481175 # update_state {config = Config }.
11491176
1177+ add_bytes_enqueue (Msg , # state {msg_bytes_enqueue = Enqueue } = State ) ->
1178+ Bytes = message_size (Msg ),
1179+ State # state {msg_bytes_enqueue = Enqueue + Bytes }.
1180+
1181+ add_bytes_checkout (Msg , # state {msg_bytes_checkout = Checkout ,
1182+ msg_bytes_enqueue = Enqueue } = State ) ->
1183+ Bytes = message_size (Msg ),
1184+ State # state {msg_bytes_checkout = Checkout + Bytes ,
1185+ msg_bytes_enqueue = Enqueue - Bytes }.
1186+
1187+ add_bytes_settle (Msg , # state {msg_bytes_checkout = Checkout } = State ) ->
1188+ Bytes = message_size (Msg ),
1189+ State # state {msg_bytes_checkout = Checkout - Bytes }.
1190+
1191+ add_bytes_return (Msg , # state {msg_bytes_checkout = Checkout ,
1192+ msg_bytes_enqueue = Enqueue } = State ) ->
1193+ Bytes = message_size (Msg ),
1194+ State # state {msg_bytes_checkout = Checkout - Bytes ,
1195+ msg_bytes_enqueue = Enqueue + Bytes }.
1196+
1197+ message_size (# basic_message {content = Content }) ->
1198+ # content {payload_fragments_rev = PFR } = Content ,
1199+ iolist_size (PFR );
1200+ message_size (B ) when is_binary (B ) ->
1201+ byte_size (B );
1202+ message_size (Msg ) ->
1203+ % % probably only hit this for testing so ok to use erts_debug
1204+ erts_debug :size (Msg ).
1205+
11501206- ifdef (TEST ).
11511207- include_lib (" eunit/include/eunit.hrl" ).
11521208
@@ -1170,7 +1226,8 @@ make_update_state(Config) ->
11701226
11711227test_init (Name ) ->
11721228 init (#{name => Name ,
1173- queue_resource => queue_resource ,
1229+ queue_resource => rabbit_misc :r (" /" , queue ,
1230+ atom_to_binary (Name , utf8 )),
11741231 shadow_copy_interval => 0 }).
11751232
11761233enq_enq_checkout_test () ->
@@ -1529,13 +1586,15 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
15291586tick_test () ->
15301587 Cid = {<<" c" >>, self ()},
15311588 Cid2 = {<<" c2" >>, self ()},
1532- {S0 , _ } = enq (1 , 1 , fst , test_init (test )),
1533- {S1 , _ } = enq (2 , 2 , snd , S0 ),
1589+ {S0 , _ } = enq (1 , 1 , << " fst" >> , test_init (? FUNCTION_NAME )),
1590+ {S1 , _ } = enq (2 , 2 , << " snd" >> , S0 ),
15341591 {S2 , {MsgId , _ }} = deq (3 , Cid , unsettled , S1 ),
15351592 {S3 , {_ , _ }} = deq (4 , Cid2 , unsettled , S2 ),
15361593 {S4 , _ , _ } = apply (meta (5 ), make_return (Cid , [MsgId ]), [], S3 ),
15371594
1538- [{mod_call , _ , _ , [_ , {test , 1 , 1 , 2 , 1 }]}, {aux , emit }] = tick (1 , S4 ),
1595+ [{mod_call , _ , _ ,
1596+ [# resource {},
1597+ {? FUNCTION_NAME , 1 , 1 , 2 , 1 , 3 , 3 }]}, {aux , emit }] = tick (1 , S4 ),
15391598 ok .
15401599
15411600enq_deq_snapshot_recover_test () ->
0 commit comments