92
92
seq :: option (msg_seqno ()),
93
93
msg :: raw_msg ()}).
94
94
-record (? ENQ_V2 , {seq :: option (msg_seqno ()),
95
- msg :: raw_msg ()}).
95
+ msg :: raw_msg (),
96
+ size :: {MetadataSize :: non_neg_integer (),
97
+ PayloadSize :: non_neg_integer ()}}).
96
98
-record (requeue , {consumer_key :: consumer_key (),
97
99
msg_id :: msg_id (),
98
100
index :: ra :index (),
@@ -208,10 +210,10 @@ update_config(Conf, State) ->
208
210
{state (), ra_machine :reply ()}.
209
211
apply (Meta , # enqueue {pid = From , seq = Seq ,
210
212
msg = RawMsg }, State00 ) ->
211
- apply_enqueue (Meta , From , Seq , RawMsg , State00 );
213
+ apply_enqueue (Meta , From , Seq , RawMsg , message_size ( RawMsg ), State00 );
212
214
apply (#{reply_mode := {notify , _Corr , EnqPid }} = Meta ,
213
- #? ENQ_V2 {seq = Seq , msg = RawMsg }, State00 ) ->
214
- apply_enqueue (Meta , EnqPid , Seq , RawMsg , State00 );
215
+ #? ENQ_V2 {seq = Seq , msg = RawMsg , size = Size }, State00 ) ->
216
+ apply_enqueue (Meta , EnqPid , Seq , RawMsg , Size , State00 );
215
217
apply (_Meta , # register_enqueuer {pid = Pid },
216
218
#? STATE {enqueuers = Enqueuers0 ,
217
219
cfg = # cfg {overflow_strategy = Overflow }} = State0 ) ->
@@ -1592,8 +1594,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
1592
1594
end .
1593
1595
1594
1596
apply_enqueue (#{index := RaftIdx ,
1595
- system_time := Ts } = Meta , From , Seq , RawMsg , State0 ) ->
1596
- case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , [], State0 ) of
1597
+ system_time := Ts } = Meta , From ,
1598
+ Seq , RawMsg , Size , State0 ) ->
1599
+ case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size , [], State0 ) of
1597
1600
{ok , State1 , Effects1 } ->
1598
1601
{State , ok , Effects } = checkout (Meta , State0 , State1 , Effects1 ),
1599
1602
{maybe_store_release_cursor (RaftIdx , State ), ok , Effects };
@@ -1672,13 +1675,14 @@ maybe_store_release_cursor(RaftIdx,
1672
1675
maybe_store_release_cursor (_RaftIdx , State ) ->
1673
1676
State .
1674
1677
1675
- maybe_enqueue (RaftIdx , Ts , undefined , undefined , RawMsg , Effects ,
1676
- #? STATE {msg_bytes_enqueue = Enqueue ,
1677
- enqueue_count = EnqCount ,
1678
- messages = Messages ,
1679
- messages_total = Total } = State0 ) ->
1678
+ maybe_enqueue (RaftIdx , Ts , undefined , undefined , RawMsg ,
1679
+ {_MetaSize , BodySize },
1680
+ Effects , #? STATE {msg_bytes_enqueue = Enqueue ,
1681
+ enqueue_count = EnqCount ,
1682
+ messages = Messages ,
1683
+ messages_total = Total } = State0 ) ->
1680
1684
% direct enqueue without tracking
1681
- Size = message_size ( RawMsg ) ,
1685
+ Size = BodySize ,
1682
1686
Header = maybe_set_msg_ttl (RawMsg , Ts , Size , State0 ),
1683
1687
Msg = ? MSG (RaftIdx , Header ),
1684
1688
PTag = priority_tag (RawMsg ),
@@ -1688,22 +1692,24 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg, Effects,
1688
1692
messages = rabbit_fifo_q :in (PTag , Msg , Messages )
1689
1693
},
1690
1694
{ok , State , Effects };
1691
- maybe_enqueue (RaftIdx , Ts , From , MsgSeqNo , RawMsg , Effects0 ,
1692
- #? STATE {msg_bytes_enqueue = Enqueue ,
1693
- enqueue_count = EnqCount ,
1694
- enqueuers = Enqueuers0 ,
1695
- messages = Messages ,
1696
- messages_total = Total } = State0 ) ->
1695
+ maybe_enqueue (RaftIdx , Ts , From , MsgSeqNo , RawMsg ,
1696
+ {_MetaSize , BodySize } = Size ,
1697
+ Effects0 , #? STATE {msg_bytes_enqueue = Enqueue ,
1698
+ enqueue_count = EnqCount ,
1699
+ enqueuers = Enqueuers0 ,
1700
+ messages = Messages ,
1701
+ messages_total = Total } = State0 ) ->
1697
1702
1698
1703
case maps :get (From , Enqueuers0 , undefined ) of
1699
1704
undefined ->
1700
1705
State1 = State0 #? STATE {enqueuers = Enqueuers0 #{From => # enqueuer {}}},
1701
1706
{Res , State , Effects } = maybe_enqueue (RaftIdx , Ts , From , MsgSeqNo ,
1702
- RawMsg , Effects0 , State1 ),
1707
+ RawMsg , Size , Effects0 ,
1708
+ State1 ),
1703
1709
{Res , State , [{monitor , process , From } | Effects ]};
1704
1710
# enqueuer {next_seqno = MsgSeqNo } = Enq0 ->
1705
1711
% it is the next expected seqno
1706
- Size = message_size ( RawMsg ) ,
1712
+ Size = BodySize ,
1707
1713
Header = maybe_set_msg_ttl (RawMsg , Ts , Size , State0 ),
1708
1714
Msg = ? MSG (RaftIdx , Header ),
1709
1715
Enq = Enq0 # enqueuer {next_seqno = MsgSeqNo + 1 },
@@ -2458,7 +2464,9 @@ make_enqueue(Pid, Seq, Msg) ->
2458
2464
true when is_pid (Pid ) andalso
2459
2465
is_integer (Seq ) ->
2460
2466
% % more compact format
2461
- #? ENQ_V2 {seq = Seq , msg = Msg };
2467
+ #? ENQ_V2 {seq = Seq ,
2468
+ msg = Msg ,
2469
+ size = mc :size (Msg )};
2462
2470
_ ->
2463
2471
# enqueue {pid = Pid , seq = Seq , msg = Msg }
2464
2472
end .
0 commit comments