4141 query_consumer_count /1 ,
4242 usage /1 ,
4343
44+ zero /1 ,
45+
4446 % % misc
4547 dehydrate_state /1
4648 ]).
4749
4850-ifdef (TEST ).
4951-export ([
50- metrics_handler /1
52+ metrics_handler /1 ,
53+ one /1
5154 ]).
5255-endif .
5356
186189 cancel_consumer_handler :: maybe (applied_mfa ()),
187190 become_leader_handler :: maybe (applied_mfa ()),
188191 metrics_handler :: maybe (applied_mfa ()),
192+ message_size_handler :: maybe (applied_mfa ()),
189193 % % This is a special field that is only used for snapshots
190194 % % It represents the number of queued messages at the time the
191195 % % dehydrated snapshot state was cached.
196200 % % it instead takes messages from the `messages' map.
197201 % % This is done so that consumers are still served in a deterministic
198202 % % order on recovery.
199- prefix_msg_count = 0 :: non_neg_integer ()
203+ prefix_msg_count = 0 :: non_neg_integer (),
204+ msg_bytes_enqueue = 0 :: non_neg_integer (),
205+ msg_bytes_checkout = 0 :: non_neg_integer ()
200206 }).
201207
202208- opaque state () :: # state {}.
206212 become_leader_handler => applied_mfa (),
207213 cancel_consumer_handler => applied_mfa (),
208214 metrics_handler => applied_mfa (),
209- shadow_copy_interval => non_neg_integer ()}.
215+ shadow_copy_interval => non_neg_integer (),
216+ message_size_handler => applied_mfa ()}.
210217
211218- export_type ([protocol / 0 ,
212219 delivery / 0 ,
@@ -231,11 +238,16 @@ update_state(Conf, State) ->
231238 BLH = maps :get (become_leader_handler , Conf , undefined ),
232239 MH = maps :get (metrics_handler , Conf , undefined ),
233240 SHI = maps :get (shadow_copy_interval , Conf , ? SHADOW_COPY_INTERVAL ),
241+ MSH = maps :get (message_size_handler , Conf , {? MODULE , zero , []}),
234242 State # state {dead_letter_handler = DLH ,
235243 cancel_consumer_handler = CCH ,
236244 become_leader_handler = BLH ,
237245 metrics_handler = MH ,
238- shadow_copy_interval = SHI }.
246+ shadow_copy_interval = SHI ,
247+ message_size_handler = MSH }.
248+
249+ zero (_ ) ->
250+ 0 .
239251
240252% msg_ids are scoped per consumer
241253% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -245,7 +257,8 @@ update_state(Conf, State) ->
245257apply (#{index := RaftIdx }, {enqueue , From , Seq , RawMsg }, Effects0 , State00 ) ->
246258 case maybe_enqueue (RaftIdx , From , Seq , RawMsg , Effects0 , State00 ) of
247259 {ok , State0 , Effects1 } ->
248- {State , Effects , ok } = checkout (State0 , Effects1 ),
260+ {State , Effects , ok } = checkout (add_bytes_enqueue (RawMsg , State0 ),
261+ Effects1 ),
249262 {append_to_master_index (RaftIdx , State ), Effects , ok };
250263 {duplicate , State , Effects } ->
251264 {State , Effects , ok }
@@ -506,12 +519,16 @@ tick(_Ts, #state{name = Name,
506519 messages = Messages ,
507520 ra_indexes = Indexes ,
508521 metrics_handler = MH ,
509- consumers = Cons } = State ) ->
522+ consumers = Cons ,
523+ msg_bytes_enqueue = EnqueueBytes ,
524+ msg_bytes_checkout = CheckoutBytes } = State ) ->
510525 Metrics = {Name ,
511526 maps :size (Messages ), % Ready
512527 num_checked_out (State ), % checked out
513528 rabbit_fifo_index :size (Indexes ), % % Total
514- maps :size (Cons )}, % Consumers
529+ maps :size (Cons ), % Consumers
530+ EnqueueBytes ,
531+ CheckoutBytes },
515532 case MH of
516533 undefined ->
517534 [{aux , emit }];
@@ -523,13 +540,18 @@ tick(_Ts, #state{name = Name,
523540overview (# state {consumers = Cons ,
524541 enqueuers = Enqs ,
525542 messages = Messages ,
526- ra_indexes = Indexes } = State ) ->
543+ ra_indexes = Indexes ,
544+ msg_bytes_enqueue = EnqueueBytes ,
545+ msg_bytes_checkout = CheckoutBytes
546+ } = State ) ->
527547 #{type => ? MODULE ,
528548 num_consumers => maps :size (Cons ),
529549 num_checked_out => num_checked_out (State ),
530550 num_enqueuers => maps :size (Enqs ),
531551 num_ready_messages => maps :size (Messages ),
532- num_messages => rabbit_fifo_index :size (Indexes )}.
552+ num_messages => rabbit_fifo_index :size (Indexes ),
553+ enqueue_message_bytes => EnqueueBytes ,
554+ checkout_message_bytes => CheckoutBytes }.
533555
534556- spec get_checked_out (consumer_id (), msg_id (), msg_id (), state ()) ->
535557 [delivery_msg ()].
@@ -766,12 +788,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
766788 Checked = maps :without (MsgIds , Checked0 ),
767789 Discarded = maps :with (MsgIds , Checked0 ),
768790 MsgRaftIdxs = [RIdx || {_ , {RIdx , _ }} <- maps :values (Discarded )],
791+ State1 = lists :foldl (fun ({_ , {_ , {_ , RawMsg }}}, Acc ) ->
792+ add_bytes_settle (RawMsg , Acc );
793+ (_ , Acc ) ->
794+ Acc
795+ end , State0 , maps :values (Discarded )),
769796 % % need to pass the length of discarded as $prefix_msgs would be filtered
770797 % % by the above list comprehension
771- {State1 , Effects1 , _ } = complete (ConsumerId , MsgRaftIdxs ,
798+ {State2 , Effects1 , _ } = complete (ConsumerId , MsgRaftIdxs ,
772799 maps :size (Discarded ),
773- Con0 , Checked , Effects0 , State0 ),
774- {State , Effects , _ } = checkout (State1 , Effects1 ),
800+ Con0 , Checked , Effects0 , State1 ),
801+ {State , Effects , _ } = checkout (State2 , Effects1 ),
775802 % settle metrics are incremented separately
776803 update_smallest_raft_index (IncomingRaftIdx , Indexes0 , State , Effects ).
777804
@@ -834,8 +861,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
834861 1 , Header0 ),
835862 Msg = {RaftId , {Header , RawMsg }},
836863 % this should not affect the release cursor in any way
837- State0 # state {messages = maps :put (MsgNum , Msg , Messages ),
838- returns = queue :in (MsgNum , Returns )}.
864+ add_bytes_return (RawMsg ,
865+ State0 # state {messages = maps :put (MsgNum , Msg , Messages ),
866+ returns = queue :in (MsgNum , Returns )}).
839867
840868return_all (State , Checked ) ->
841869 maps :fold (fun (_ , '$prefix_msg' ,
@@ -941,14 +969,17 @@ checkout_one(#state{service_queue = SQ0,
941969 {Cons , SQ , []} = % we expect no effects
942970 update_or_remove_sub (ConsumerId , Con ,
943971 Cons0 , SQ1 , []),
944- State = State0 # state {service_queue = SQ ,
945- messages = Messages ,
946- prefix_msg_count = PrefMsgC ,
947- consumers = Cons },
948- Msg = case ConsumerMsg of
949- '$prefix_msg' -> '$prefix_msg' ;
950- {_ , {_ , M }} -> M
951- end ,
972+ State1 = State0 # state {service_queue = SQ ,
973+ messages = Messages ,
974+ prefix_msg_count = PrefMsgC ,
975+ consumers = Cons },
976+ {State , Msg } =
977+ case ConsumerMsg of
978+ '$prefix_msg' ->
979+ {State1 , '$prefix_msg' };
980+ {_ , {_ , {_ , RawMsg } = M }} ->
981+ {add_bytes_checkout (RawMsg , State1 ), M }
982+ end ,
952983 {success , ConsumerId , Next , Msg , State };
953984 error ->
954985 % % consumer did not exist but was queued, recurse
@@ -1052,6 +1083,29 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
10521083 Checked = maps :map (fun (_ , _ ) -> '$prefix_msg' end , Checked0 ),
10531084 Con # consumer {checked_out = Checked }.
10541085
1086+ add_bytes_enqueue (Msg , # state {message_size_handler = {M , F , A },
1087+ msg_bytes_enqueue = Enqueue } = State ) ->
1088+ Bytes = apply (M , F , [Msg | A ]),
1089+ State # state {msg_bytes_enqueue = Enqueue + Bytes }.
1090+
1091+ add_bytes_checkout (Msg , # state {message_size_handler = {M , F , A },
1092+ msg_bytes_checkout = Checkout ,
1093+ msg_bytes_enqueue = Enqueue } = State ) ->
1094+ Bytes = apply (M , F , [Msg | A ]),
1095+ State # state {msg_bytes_checkout = Checkout + Bytes ,
1096+ msg_bytes_enqueue = Enqueue - Bytes }.
1097+
1098+ add_bytes_settle (Msg , # state {message_size_handler = {M , F , A },
1099+ msg_bytes_checkout = Checkout } = State ) ->
1100+ Bytes = apply (M , F , [Msg | A ]),
1101+ State # state {msg_bytes_checkout = Checkout - Bytes }.
1102+
1103+ add_bytes_return (Msg , # state {message_size_handler = {M , F , A },
1104+ msg_bytes_checkout = Checkout ,
1105+ msg_bytes_enqueue = Enqueue } = State ) ->
1106+ Bytes = apply (M , F , [Msg | A ]),
1107+ State # state {msg_bytes_checkout = Checkout - Bytes ,
1108+ msg_bytes_enqueue = Enqueue + Bytes }.
10551109
10561110- ifdef (TEST ).
10571111- include_lib (" eunit/include/eunit.hrl" ).
@@ -1079,6 +1133,12 @@ test_init(Name) ->
10791133 shadow_copy_interval => 0 ,
10801134 metrics_handler => {? MODULE , metrics_handler , []}}).
10811135
1136+ test_init (Name , MsgSizeHandler ) ->
1137+ init (#{name => Name ,
1138+ shadow_copy_interval => 0 ,
1139+ metrics_handler => {? MODULE , metrics_handler , []},
1140+ message_size_handler => MsgSizeHandler }).
1141+
10821142metrics_handler (_ ) ->
10831143 ok .
10841144
@@ -1416,16 +1476,19 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
14161476 Effects2 ),
14171477 ok .
14181478
1479+ one (_ ) ->
1480+ 1 .
1481+
14191482tick_test () ->
14201483 Cid = {<<" c" >>, self ()},
14211484 Cid2 = {<<" c2" >>, self ()},
1422- {S0 , _ } = enq (1 , 1 , fst , test_init (test )),
1485+ {S0 , _ } = enq (1 , 1 , fst , test_init (test , { ? MODULE , one , []} )),
14231486 {S1 , _ } = enq (2 , 2 , snd , S0 ),
14241487 {S2 , {MsgId , _ }} = deq (3 , Cid , unsettled , S1 ),
14251488 {S3 , {_ , _ }} = deq (4 , Cid2 , unsettled , S2 ),
14261489 {S4 , _ , _ } = apply (meta (5 ), {return , [MsgId ], Cid }, [], S3 ),
14271490
1428- [{mod_call , _ , _ , [{test , 1 , 1 , 2 , 1 }]}, {aux , emit }] = tick (1 , S4 ),
1491+ [{mod_call , _ , _ , [{test , 1 , 1 , 2 , 1 , 1 , 1 }]}, {aux , emit }] = tick (1 , S4 ),
14291492 ok .
14301493
14311494enq_deq_snapshot_recover_test () ->
0 commit comments