14
14
-dialyzer (no_improper_lists ).
15
15
16
16
-include (" rabbit_fifo.hrl" ).
17
- -include_lib (" rabbit_common/include/rabbit.hrl" ).
18
17
19
18
-define (STATE , ? MODULE ).
20
19
@@ -299,7 +298,9 @@ apply(#{index := Idx} = Meta,
299
298
credit = increase_credit (Con0 , 1 )},
300
299
State1 = State0 #? STATE {ra_indexes = rabbit_fifo_index :delete (OldIdx ,
301
300
Indexes0 ),
302
- messages = lqueue :in (? MSG (Idx , Header ), Messages ),
301
+ messages = rabbit_fifo_q :in (lo ,
302
+ ? MSG (Idx , Header ),
303
+ Messages ),
303
304
enqueue_count = EnqCount + 1 },
304
305
State2 = update_or_remove_con (Meta , ConsumerKey , Con , State1 ),
305
306
{State , Ret , Effs } = checkout (Meta , State0 , State2 , []),
@@ -566,7 +567,7 @@ apply(#{index := Index}, #purge{},
566
567
end , Indexes0 , Returns )
567
568
end ,
568
569
State1 = State0 #? STATE {ra_indexes = Indexes ,
569
- messages = lqueue :new (),
570
+ messages = rabbit_fifo_q :new (),
570
571
messages_total = Total - NumReady ,
571
572
returns = lqueue :new (),
572
573
msg_bytes_enqueue = 0
@@ -736,7 +737,11 @@ apply(_Meta, Cmd, State) ->
736
737
rabbit_log :debug (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
737
738
{State , ok , []}.
738
739
739
- convert_v3_to_v4 (#{system_time := Ts }, # rabbit_fifo {consumers = Consumers0 } = StateV3 ) ->
740
+ convert_v3_to_v4 (#{system_time := Ts },
741
+ StateV3 ) ->
742
+ Messages0 = rabbit_fifo_v3 :get_field (messages , StateV3 ),
743
+ Consumers0 = rabbit_fifo_v3 :get_field (consumers , StateV3 ),
744
+ Messages = rabbit_fifo_q :from_lqueue (Messages0 ),
740
745
Consumers = maps :map (
741
746
fun (_CKey , # consumer {checked_out = Ch0 } = C ) ->
742
747
Ch = maps :map (
@@ -745,7 +750,23 @@ convert_v3_to_v4(#{system_time := Ts}, #rabbit_fifo{consumers = Consumers0} = St
745
750
end , Ch0 ),
746
751
C # consumer {checked_out = Ch }
747
752
end , Consumers0 ),
748
- StateV3 #? MODULE {consumers = Consumers }.
753
+ #? MODULE {cfg = rabbit_fifo_v3 :get_field (cfg , StateV3 ),
754
+ messages = Messages ,
755
+ messages_total = rabbit_fifo_v3 :get_field (messages_total , StateV3 ),
756
+ returns = rabbit_fifo_v3 :get_field (returns , StateV3 ),
757
+ enqueue_count = rabbit_fifo_v3 :get_field (enqueue_count , StateV3 ),
758
+ enqueuers = rabbit_fifo_v3 :get_field (enqueuers , StateV3 ),
759
+ ra_indexes = rabbit_fifo_v3 :get_field (ra_indexes , StateV3 ),
760
+ release_cursors = rabbit_fifo_v3 :get_field (release_cursors , StateV3 ),
761
+ consumers = Consumers ,
762
+ % consumers that require further service are queued here
763
+ service_queue = rabbit_fifo_v3 :get_field (service_queue , StateV3 ),
764
+ dlx = rabbit_fifo_v3 :get_field (dlx , StateV3 ),
765
+ msg_bytes_enqueue = rabbit_fifo_v3 :get_field (msg_bytes_enqueue , StateV3 ),
766
+ msg_bytes_checkout = rabbit_fifo_v3 :get_field (msg_bytes_checkout , StateV3 ),
767
+ waiting_consumers = rabbit_fifo_v3 :get_field (waiting_consumers , StateV3 ),
768
+ last_active = rabbit_fifo_v3 :get_field (last_active , StateV3 ),
769
+ msg_cache = rabbit_fifo_v3 :get_field (msg_cache , StateV3 )}.
749
770
750
771
purge_node (Meta , Node , State , Effects ) ->
751
772
lists :foldl (fun (Pid , {S0 , E0 }) ->
@@ -1348,7 +1369,7 @@ is_v4() ->
1348
1369
1349
1370
messages_ready (#? STATE {messages = M ,
1350
1371
returns = R }) ->
1351
- lqueue :len (M ) + lqueue :len (R ).
1372
+ rabbit_fifo_q :len (M ) + lqueue :len (R ).
1352
1373
1353
1374
messages_total (#? STATE {messages_total = Total ,
1354
1375
dlx = DlxState }) ->
@@ -1599,19 +1620,6 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
1599
1620
{State0 , Effects }
1600
1621
end .
1601
1622
1602
- maybe_set_msg_ttl (# basic_message {content = # content {properties = none }},
1603
- RaCmdTs , Header ,
1604
- #? STATE {cfg = # cfg {msg_ttl = PerQueueMsgTTL }}) ->
1605
- update_expiry_header (RaCmdTs , PerQueueMsgTTL , Header );
1606
- maybe_set_msg_ttl (# basic_message {content = # content {properties = Props }},
1607
- RaCmdTs , Header ,
1608
- #? STATE {cfg = # cfg {msg_ttl = PerQueueMsgTTL }}) ->
1609
- % % rabbit_quorum_queue will leave the properties decoded if and only if
1610
- % % per message message TTL is set.
1611
- % % We already check in the channel that expiration must be valid.
1612
- {ok , PerMsgMsgTTL } = rabbit_basic :parse_expiration (Props ),
1613
- TTL = min (PerMsgMsgTTL , PerQueueMsgTTL ),
1614
- update_expiry_header (RaCmdTs , TTL , Header );
1615
1623
maybe_set_msg_ttl (Msg , RaCmdTs , Header ,
1616
1624
#? STATE {cfg = # cfg {msg_ttl = MsgTTL }}) ->
1617
1625
case mc :is (Msg ) of
@@ -1673,10 +1681,11 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg, Effects,
1673
1681
Size = message_size (RawMsg ),
1674
1682
Header = maybe_set_msg_ttl (RawMsg , Ts , Size , State0 ),
1675
1683
Msg = ? MSG (RaftIdx , Header ),
1684
+ PTag = priority_tag (RawMsg ),
1676
1685
State = State0 #? STATE {msg_bytes_enqueue = Enqueue + Size ,
1677
1686
enqueue_count = EnqCount + 1 ,
1678
1687
messages_total = Total + 1 ,
1679
- messages = lqueue :in (Msg , Messages )
1688
+ messages = rabbit_fifo_q :in (PTag , Msg , Messages )
1680
1689
},
1681
1690
{ok , State , Effects };
1682
1691
maybe_enqueue (RaftIdx , Ts , From , MsgSeqNo , RawMsg , Effects0 ,
@@ -1704,10 +1713,11 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Effects0,
1704
1713
false ->
1705
1714
undefined
1706
1715
end ,
1716
+ PTag = priority_tag (RawMsg ),
1707
1717
State = State0 #? STATE {msg_bytes_enqueue = Enqueue + Size ,
1708
1718
enqueue_count = EnqCount + 1 ,
1709
1719
messages_total = Total + 1 ,
1710
- messages = lqueue :in (Msg , Messages ),
1720
+ messages = rabbit_fifo_q :in (PTag , Msg , Messages ),
1711
1721
enqueuers = Enqueuers0 #{From => Enq },
1712
1722
msg_cache = MsgCache
1713
1723
},
@@ -1824,10 +1834,10 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
1824
1834
#? STATE {cfg = Cfg ,
1825
1835
release_cursors = Cursors0 } = State0 ,
1826
1836
Effects ) ->
1827
- Total = messages_total (State0 ),
1837
+ % Total = messages_total(State0),
1828
1838
% % TODO: optimise
1829
1839
case smallest_raft_index (State0 ) of
1830
- undefined when Total == 0 ->
1840
+ undefined ->
1831
1841
% there are no messages on queue anymore and no pending enqueues
1832
1842
% we can forward release_cursor all the way until
1833
1843
% the last received command, hooray
@@ -1838,8 +1848,8 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
1838
1848
release_cursors = lqueue :new (),
1839
1849
enqueue_count = 0 },
1840
1850
{State , Reply , Effects ++ [{release_cursor , IncomingRaftIdx , State }]};
1841
- undefined ->
1842
- {State0 , Reply , Effects };
1851
+ % undefined ->
1852
+ % {State0, Reply, Effects};
1843
1853
Smallest when is_integer (Smallest ) ->
1844
1854
case find_next_cursor (Smallest , Cursors0 ) of
1845
1855
empty ->
@@ -2066,10 +2076,10 @@ take_next_msg(#?STATE{returns = Returns0,
2066
2076
{{value , NextMsg }, Returns } ->
2067
2077
{NextMsg , State #? STATE {returns = Returns }};
2068
2078
{empty , _ } ->
2069
- case lqueue :out (Messages0 ) of
2079
+ case rabbit_fifo_q :out (Messages0 ) of
2070
2080
{empty , _ } ->
2071
2081
empty ;
2072
- {{ value , ? MSG (RaftIdx , _ ) = Msg } , Messages } ->
2082
+ {_P , ? MSG (RaftIdx , _ ) = Msg , Messages } ->
2073
2083
% % add index here
2074
2084
Indexes = rabbit_fifo_index :append (RaftIdx , Indexes0 ),
2075
2085
{Msg , State #? STATE {messages = Messages ,
@@ -2081,7 +2091,7 @@ get_next_msg(#?STATE{returns = Returns0,
2081
2091
messages = Messages0 }) ->
2082
2092
case lqueue :get (Returns0 , empty ) of
2083
2093
empty ->
2084
- lqueue :get (Messages0 , empty );
2094
+ rabbit_fifo_q :get (Messages0 );
2085
2095
Msg ->
2086
2096
Msg
2087
2097
end .
@@ -2176,7 +2186,7 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
2176
2186
checkout_one (Meta , ExpiredMsg ,
2177
2187
InitState #? STATE {service_queue = SQ1 }, Effects1 );
2178
2188
{empty , _ } ->
2179
- case lqueue :len (Messages0 ) of
2189
+ case rabbit_fifo_q :len (Messages0 ) of
2180
2190
0 ->
2181
2191
{nochange , ExpiredMsg , InitState , Effects1 };
2182
2192
_ ->
@@ -2410,9 +2420,10 @@ normalize(#?STATE{ra_indexes = _Indexes,
2410
2420
release_cursors = Cursors ,
2411
2421
dlx = DlxState } = State ) ->
2412
2422
State #? STATE {returns = lqueue :from_list (lqueue :to_list (Returns )),
2413
- messages = lqueue :from_list (lqueue :to_list (Messages )),
2414
- release_cursors = lqueue :from_list (lqueue :to_list (Cursors )),
2415
- dlx = rabbit_fifo_dlx :normalize (DlxState )}.
2423
+ messages = rabbit_fifo_q :normalize (Messages ,
2424
+ rabbit_fifo_q :new ()),
2425
+ release_cursors = lqueue :from_list (lqueue :to_list (Cursors )),
2426
+ dlx = rabbit_fifo_dlx :normalize (DlxState )}.
2416
2427
2417
2428
is_over_limit (#? STATE {cfg = # cfg {max_length = undefined ,
2418
2429
max_bytes = undefined }}) ->
@@ -2520,9 +2531,6 @@ add_bytes_return(Header,
2520
2531
State #? STATE {msg_bytes_checkout = Checkout - Size ,
2521
2532
msg_bytes_enqueue = Enqueue + Size }.
2522
2533
2523
- message_size (# basic_message {content = Content }) ->
2524
- # content {payload_fragments_rev = PFR } = Content ,
2525
- iolist_size (PFR );
2526
2534
message_size (B ) when is_binary (B ) ->
2527
2535
byte_size (B );
2528
2536
message_size (Msg ) ->
@@ -2644,12 +2652,7 @@ smallest_raft_index(#?STATE{messages = Messages,
2644
2652
ra_indexes = Indexes ,
2645
2653
dlx = DlxState }) ->
2646
2654
SmallestDlxRaIdx = rabbit_fifo_dlx :smallest_raft_index (DlxState ),
2647
- SmallestMsgsRaIdx = case lqueue :get (Messages , undefined ) of
2648
- ? MSG (I , _ ) when is_integer (I ) ->
2649
- I ;
2650
- _ ->
2651
- undefined
2652
- end ,
2655
+ SmallestMsgsRaIdx = rabbit_fifo_q :get_lowest_index (Messages ),
2653
2656
SmallestRaIdx = rabbit_fifo_index :smallest (Indexes ),
2654
2657
lists :min ([SmallestDlxRaIdx , SmallestMsgsRaIdx , SmallestRaIdx ]).
2655
2658
@@ -2791,3 +2794,16 @@ maps_search(Pred, {K, V, I}) ->
2791
2794
end ;
2792
2795
maps_search (Pred , Map ) when is_map (Map ) ->
2793
2796
maps_search (Pred , maps :next (maps :iterator (Map ))).
2797
+
2798
+ priority_tag (Msg ) ->
2799
+ case mc :is (Msg ) of
2800
+ true ->
2801
+ case mc :priority (Msg ) of
2802
+ P when P > 4 ->
2803
+ hi ;
2804
+ _ ->
2805
+ lo
2806
+ end ;
2807
+ false ->
2808
+ lo
2809
+ end .
0 commit comments