Skip to content

Commit dbb5301

Browse files
committed
wip
1 parent d016d0a commit dbb5301

File tree

4 files changed

+277
-15
lines changed

4 files changed

+277
-15
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,9 @@ apply(#{index := Idx} = Meta,
299299
credit = increase_credit(Con0, 1)},
300300
State1 = State0#?STATE{ra_indexes = rabbit_fifo_index:delete(OldIdx,
301301
Indexes0),
302-
messages = lqueue:in(?MSG(Idx, Header), Messages),
302+
messages = rabbit_fifo_q:in(lo,
303+
?MSG(Idx, Header),
304+
Messages),
303305
enqueue_count = EnqCount + 1},
304306
State2 = update_or_remove_con(Meta, ConsumerKey, Con, State1),
305307
{State, Ret, Effs} = checkout(Meta, State0, State2, []),
@@ -566,7 +568,7 @@ apply(#{index := Index}, #purge{},
566568
end, Indexes0, Returns)
567569
end,
568570
State1 = State0#?STATE{ra_indexes = Indexes,
569-
messages = lqueue:new(),
571+
messages = rabbit_fifo_q:new(),
570572
messages_total = Total - NumReady,
571573
returns = lqueue:new(),
572574
msg_bytes_enqueue = 0
@@ -736,7 +738,10 @@ apply(_Meta, Cmd, State) ->
736738
rabbit_log:debug("rabbit_fifo: unhandled command ~W", [Cmd, 10]),
737739
{State, ok, []}.
738740

739-
convert_v3_to_v4(#{system_time := Ts}, #rabbit_fifo{consumers = Consumers0} = StateV3) ->
741+
convert_v3_to_v4(#{system_time := Ts},
742+
#rabbit_fifo{messages = Messages0,
743+
consumers = Consumers0} = StateV3) ->
744+
Messages = rabbit_fifo_q:from_lqueue(Messages0),
740745
Consumers = maps:map(
741746
fun (_CKey, #consumer{checked_out = Ch0} = C) ->
742747
Ch = maps:map(
@@ -745,7 +750,8 @@ convert_v3_to_v4(#{system_time := Ts}, #rabbit_fifo{consumers = Consumers0} = St
745750
end, Ch0),
746751
C#consumer{checked_out = Ch}
747752
end, Consumers0),
748-
StateV3#?MODULE{consumers = Consumers}.
753+
StateV3#?MODULE{messages = Messages,
754+
consumers = Consumers}.
749755

750756
purge_node(Meta, Node, State, Effects) ->
751757
lists:foldl(fun(Pid, {S0, E0}) ->
@@ -1348,7 +1354,7 @@ is_v4() ->
13481354

13491355
messages_ready(#?STATE{messages = M,
13501356
returns = R}) ->
1351-
lqueue:len(M) + lqueue:len(R).
1357+
rabbit_fifo_q:len(M) + lqueue:len(R).
13521358

13531359
messages_total(#?STATE{messages_total = Total,
13541360
dlx = DlxState}) ->
@@ -1673,10 +1679,11 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg, Effects,
16731679
Size = message_size(RawMsg),
16741680
Header = maybe_set_msg_ttl(RawMsg, Ts, Size, State0),
16751681
Msg = ?MSG(RaftIdx, Header),
1682+
PTag = priority_tag(RawMsg),
16761683
State = State0#?STATE{msg_bytes_enqueue = Enqueue + Size,
16771684
enqueue_count = EnqCount + 1,
16781685
messages_total = Total + 1,
1679-
messages = lqueue:in(Msg, Messages)
1686+
messages = rabbit_fifo_q:in(PTag, Msg, Messages)
16801687
},
16811688
{ok, State, Effects};
16821689
maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Effects0,
@@ -1704,10 +1711,11 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Effects0,
17041711
false ->
17051712
undefined
17061713
end,
1714+
PTag = priority_tag(RawMsg),
17071715
State = State0#?STATE{msg_bytes_enqueue = Enqueue + Size,
17081716
enqueue_count = EnqCount + 1,
17091717
messages_total = Total + 1,
1710-
messages = lqueue:in(Msg, Messages),
1718+
messages = rabbit_fifo_q:in(PTag, Msg, Messages),
17111719
enqueuers = Enqueuers0#{From => Enq},
17121720
msg_cache = MsgCache
17131721
},
@@ -2066,7 +2074,7 @@ take_next_msg(#?STATE{returns = Returns0,
20662074
{{value, NextMsg}, Returns} ->
20672075
{NextMsg, State#?STATE{returns = Returns}};
20682076
{empty, _} ->
2069-
case lqueue:out(Messages0) of
2077+
case rabbit_fifo_q:out(Messages0) of
20702078
{empty, _} ->
20712079
empty;
20722080
{{value, ?MSG(RaftIdx, _) = Msg}, Messages} ->
@@ -2081,7 +2089,7 @@ get_next_msg(#?STATE{returns = Returns0,
20812089
messages = Messages0}) ->
20822090
case lqueue:get(Returns0, empty) of
20832091
empty ->
2084-
lqueue:get(Messages0, empty);
2092+
rabbit_fifo_q:get(Messages0);
20852093
Msg ->
20862094
Msg
20872095
end.
@@ -2176,7 +2184,7 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
21762184
checkout_one(Meta, ExpiredMsg,
21772185
InitState#?STATE{service_queue = SQ1}, Effects1);
21782186
{empty, _} ->
2179-
case lqueue:len(Messages0) of
2187+
case rabbit_fifo_q:len(Messages0) of
21802188
0 ->
21812189
{nochange, ExpiredMsg, InitState, Effects1};
21822190
_ ->
@@ -2410,9 +2418,9 @@ normalize(#?STATE{ra_indexes = _Indexes,
24102418
release_cursors = Cursors,
24112419
dlx = DlxState} = State) ->
24122420
State#?STATE{returns = lqueue:from_list(lqueue:to_list(Returns)),
2413-
messages = lqueue:from_list(lqueue:to_list(Messages)),
2414-
release_cursors = lqueue:from_list(lqueue:to_list(Cursors)),
2415-
dlx = rabbit_fifo_dlx:normalize(DlxState)}.
2421+
messages = rabbit_fifo_q:from_lqueue(Messages),
2422+
release_cursors = lqueue:from_list(lqueue:to_list(Cursors)),
2423+
dlx = rabbit_fifo_dlx:normalize(DlxState)}.
24162424

24172425
is_over_limit(#?STATE{cfg = #cfg{max_length = undefined,
24182426
max_bytes = undefined}}) ->
@@ -2644,7 +2652,7 @@ smallest_raft_index(#?STATE{messages = Messages,
26442652
ra_indexes = Indexes,
26452653
dlx = DlxState}) ->
26462654
SmallestDlxRaIdx = rabbit_fifo_dlx:smallest_raft_index(DlxState),
2647-
SmallestMsgsRaIdx = case lqueue:get(Messages, undefined) of
2655+
SmallestMsgsRaIdx = case rabbit_fifo_q:get(Messages) of
26482656
?MSG(I, _) when is_integer(I) ->
26492657
I;
26502658
_ ->
@@ -2791,3 +2799,16 @@ maps_search(Pred, {K, V, I}) ->
27912799
end;
27922800
maps_search(Pred, Map) when is_map(Map) ->
27932801
maps_search(Pred, maps:next(maps:iterator(Map))).
2802+
2803+
priority_tag(Msg) ->
2804+
case mc:is(Msg) of
2805+
true ->
2806+
case mc:priority(Msg) of
2807+
P when P > 4 ->
2808+
hi;
2809+
_ ->
2810+
lo
2811+
end;
2812+
false ->
2813+
lo
2814+
end.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@
181181
-record(rabbit_fifo,
182182
{cfg :: #cfg{},
183183
% unassigned messages
184-
messages = lqueue:new() :: lqueue:lqueue(msg()),
184+
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
185185
messages_total = 0 :: non_neg_integer(),
186186
% queue of returned msg_in_ids - when checking out it picks from
187187
returns = lqueue:new() :: lqueue:lqueue(term()),

deps/rabbit/src/rabbit_fifo_q.erl

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
-module(rabbit_fifo_q).
2+
3+
-export([
4+
new/0,
5+
in/3,
6+
out/1,
7+
get/1,
8+
len/1,
9+
from_lqueue/1
10+
]).
11+
12+
-define(WEIGHT, 2).
13+
14+
%% a simple weighted priority queue with only two priorities
15+
16+
-record(?MODULE, {hi = queue:new() :: queue:queue(),
17+
lo = queue:new() :: queue:queue(),
18+
len = 0 :: non_neg_integer(),
19+
dequeue_counter = 0 :: non_neg_integer()}).
20+
21+
-opaque state() :: #?MODULE{}.
22+
23+
-export_type([state/0]).
24+
25+
-spec new() -> state().
26+
new() ->
27+
#?MODULE{}.
28+
29+
-spec in(hi | lo, term(), state()) -> state().
30+
in(hi, Item, #?MODULE{hi = Hi, len = Len} = State) ->
31+
State#?MODULE{hi = queue:in(Item, Hi),
32+
len = Len + 1};
33+
in(lo, Item, #?MODULE{lo = Lo, len = Len} = State) ->
34+
State#?MODULE{lo = queue:in(Item, Lo),
35+
len = Len + 1}.
36+
37+
out(#?MODULE{len = 0} = S) ->
38+
{empty, S};
39+
out(#?MODULE{hi = Hi0,
40+
lo = Lo0,
41+
len = Len,
42+
dequeue_counter = C} = State) ->
43+
case ?WEIGHT == C of
44+
true ->
45+
%% try lo before hi
46+
case queue:out(Lo0) of
47+
{empty, _} ->
48+
{{value, _} = Ret, Hi} = queue:out(Hi0),
49+
{Ret, State#?MODULE{hi = Hi,
50+
dequeue_counter = 0,
51+
len = Len - 1}};
52+
{Ret, Lo} ->
53+
{Ret, State#?MODULE{lo = Lo,
54+
dequeue_counter = 0,
55+
len = Len - 1}}
56+
end;
57+
false ->
58+
case queue:out(Hi0) of
59+
{empty, _} ->
60+
{{value, _} = Ret, Lo} = queue:out(Lo0),
61+
{Ret, State#?MODULE{lo = Lo,
62+
dequeue_counter = C + 1,
63+
len = Len - 1}};
64+
{Ret, Hi} ->
65+
{Ret, State#?MODULE{hi = Hi,
66+
dequeue_counter = C + 1,
67+
len = Len - 1}}
68+
end
69+
end.
70+
71+
get(#?MODULE{len = 0}) ->
72+
empty;
73+
get(#?MODULE{hi = Hi0,
74+
lo = Lo0,
75+
dequeue_counter = C}) ->
76+
case ?WEIGHT == C of
77+
true ->
78+
%% try lo before hi
79+
case queue:peek(Lo0) of
80+
empty ->
81+
queue:peek(Hi0);
82+
{value, _} = Ret ->
83+
Ret
84+
end;
85+
false ->
86+
case queue:peek(Hi0) of
87+
empty ->
88+
queue:peek(Lo0);
89+
{value, _} = Ret ->
90+
Ret
91+
end
92+
end.
93+
94+
len(#?MODULE{len = Len}) ->
95+
Len.
96+
97+
from_lqueue(LQ) ->
98+
lqueue:fold(
99+
fun (Item, Acc) ->
100+
in(lo, Item, Acc)
101+
end, new(), LQ).
102+
103+
%% internals
104+
105+
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
-module(rabbit_fifo_q_SUITE).
2+
3+
-compile(nowarn_export_all).
4+
-compile(export_all).
5+
6+
-export([
7+
]).
8+
9+
-include_lib("proper/include/proper.hrl").
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
13+
14+
all() ->
15+
[
16+
{group, tests}
17+
].
18+
19+
20+
all_tests() ->
21+
[
22+
basics,
23+
single_priority_behaves_like_queue
24+
].
25+
26+
27+
groups() ->
28+
[
29+
{tests, [], all_tests()}
30+
].
31+
32+
init_per_suite(Config) ->
33+
Config.
34+
35+
end_per_suite(_Config) ->
36+
ok.
37+
38+
init_per_group(_Group, Config) ->
39+
Config.
40+
41+
end_per_group(_Group, _Config) ->
42+
ok.
43+
44+
init_per_testcase(_TestCase, Config) ->
45+
Config.
46+
47+
end_per_testcase(_TestCase, _Config) ->
48+
ok.
49+
50+
%%%===================================================================
51+
%%% Test cases
52+
%%%===================================================================
53+
54+
basics(_Config) ->
55+
Q0 = rabbit_fifo_q:new(),
56+
Q1 = lists:foldl(
57+
fun ({P, I}, Q) ->
58+
rabbit_fifo_q:in(P, I, Q)
59+
end, Q0, [
60+
{hi, hi1},
61+
{lo, lo1},
62+
{hi, hi2},
63+
{lo, lo2},
64+
{hi, hi3}
65+
]),
66+
{{value, hi1}, Q2} = rabbit_fifo_q:out(Q1),
67+
{{value, hi2}, Q3} = rabbit_fifo_q:out(Q2),
68+
{{value, lo1}, Q4} = rabbit_fifo_q:out(Q3),
69+
{{value, hi3}, Q5} = rabbit_fifo_q:out(Q4),
70+
{{value, lo2}, _Q6} = rabbit_fifo_q:out(Q5),
71+
ok.
72+
73+
74+
75+
-type op() :: {in, integer()} | out.
76+
77+
single_priority_behaves_like_queue(_Config) ->
78+
run_proper(
79+
fun () ->
80+
?FORALL({P, Ops}, {oneof([hi, lo]), op_gen(256)},
81+
queue_prop(P, Ops))
82+
end, [], 25),
83+
ok.
84+
85+
queue_prop(P, Ops) ->
86+
ct:pal("Running queue_prop for ~s", [P]),
87+
Que = queue:new(),
88+
Sut = rabbit_fifo_q:new(),
89+
{Queue, FifoQ} = lists:foldl(
90+
fun ({in, V}, {Q0, S0}) ->
91+
Q = queue:in(V, Q0),
92+
S = rabbit_fifo_q:in(P, V, S0),
93+
case queue:len(Q) == rabbit_fifo_q:len(S) of
94+
true ->
95+
{Q, S};
96+
false ->
97+
throw(false)
98+
end;
99+
(out, {Q0, S0}) ->
100+
{V1, Q} = queue:out(Q0),
101+
{V2, S} = rabbit_fifo_q:out(S0),
102+
case V1 == V2 of
103+
true ->
104+
{Q, S};
105+
false ->
106+
throw(false)
107+
end
108+
end, {Que, Sut}, Ops),
109+
110+
queue:len(Queue) == rabbit_fifo_q:len(FifoQ).
111+
112+
113+
114+
115+
%%% helpers
116+
117+
op_gen(Size) ->
118+
?LET(Ops,
119+
resize(Size,
120+
list(
121+
frequency(
122+
[
123+
{20, {in, non_neg_integer()}},
124+
{20, out}
125+
]
126+
))), Ops
127+
).
128+
129+
run_proper(Fun, Args, NumTests) ->
130+
?assert(
131+
proper:counterexample(
132+
erlang:apply(Fun, Args),
133+
[{numtests, NumTests},
134+
{on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
135+
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
136+
end}])).

0 commit comments

Comments
 (0)