Skip to content

Commit b97ccfc

Browse files
committed
CQ: No longer limit reads to a single segment
This should reduce the number of calls we make to the queue's index/store and therefore improve performance.
1 parent 28282f8 commit b97ccfc

File tree

3 files changed

+49
-69
lines changed

3 files changed

+49
-69
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
%% queue implementation itself.
2121
%% @todo TODO
2222
-export([sync/1, needs_sync/1,
23-
bounds/2, next_segment_boundary/1]).
23+
bounds/2]).
2424

2525
%% Called by rabbit_vhost.
2626
-export([all_queue_directory_names/1]).
@@ -762,11 +762,6 @@ ack_delete_fold_fun(SeqId, Write, {Buffer, Updates, Deletes, SegmentEntryCount})
762762
Deletes, SegmentEntryCount}
763763
end.
764764

765-
%% A better interface for read/3 would be to request a maximum
766-
%% of N messages, rather than first call next_segment_boundary/3
767-
%% and then read from S1 to S2. This function could then return
768-
%% either N messages or less depending on the current state.
769-
770765
-spec read(rabbit_variable_queue:seq_id(),
771766
rabbit_variable_queue:seq_id(),
772767
State) ->
@@ -1069,16 +1064,6 @@ bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
10691064
State}
10701065
end.
10711066

1072-
%% The next_segment_boundary/1 function is used internally when
1073-
%% reading. It should not be called from rabbit_variable_queue.
1074-
1075-
-spec next_segment_boundary(SeqId) -> SeqId when SeqId::rabbit_variable_queue:seq_id().
1076-
1077-
next_segment_boundary(SeqId) ->
1078-
?DEBUG("~0p", [SeqId]),
1079-
SegmentEntryCount = segment_entry_count(),
1080-
(1 + (SeqId div SegmentEntryCount)) * SegmentEntryCount.
1081-
10821067
%% ----
10831068
%%
10841069
%% Internal.
@@ -1089,6 +1074,10 @@ segment_entry_count() ->
10891074
%% producer produces.
10901075
persistent_term:get(classic_queue_index_v2_segment_entry_count, 4096).
10911076

1077+
next_segment_boundary(SeqId) ->
1078+
SegmentEntryCount = segment_entry_count(),
1079+
(1 + (SeqId div SegmentEntryCount)) * SegmentEntryCount.
1080+
10921081
%% Note that store files will also be removed if there are any in this directory.
10931082
%% Currently the v2 per-queue store expects this function to remove its own files.
10941083

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,11 +1864,8 @@ read_from_q_tail(DelsAndAcksFun,
18641864
%% For v2 we want to limit the number of messages read at once to lower
18651865
%% the memory footprint. We use the consume rate to determine how many
18661866
%% messages we read.
1867-
%% @todo Simply ask for N messages instead of low/high bounds.
18681867
QTailSeqLimit = QTailSeqId + MemoryLimit,
1869-
QTailSeqId1 =
1870-
lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(QTailSeqId),
1871-
QTailSeqLimit, QTailSeqIdEnd]),
1868+
QTailSeqId1 = min(QTailSeqLimit, QTailSeqIdEnd),
18721869
{List0, IndexState1} = rabbit_classic_queue_index_v2:read(QTailSeqId, QTailSeqId1, IndexState),
18731870
{List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of
18741871
messages ->

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 43 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -796,9 +796,12 @@ bq_queue_index(Config) ->
796796
index_mod() ->
797797
rabbit_classic_queue_index_v2.
798798

799+
segment_size() ->
800+
persistent_term:get(classic_queue_index_v2_segment_entry_count, 4096).
801+
799802
bq_queue_index1(_Config) ->
800803
IndexMod = index_mod(),
801-
SegmentSize = IndexMod:next_segment_boundary(0),
804+
SegmentSize = segment_size(),
802805
TwoSegs = SegmentSize + SegmentSize,
803806
MostOfASegment = trunc(SegmentSize*0.75),
804807
SeqIdsA = lists:seq(0, MostOfASegment-1),
@@ -992,8 +995,8 @@ v2_delete_segment_file_completely_acked(Config) ->
992995
?MODULE, v2_delete_segment_file_completely_acked1, [Config]).
993996

994997
v2_delete_segment_file_completely_acked1(_Config) ->
995-
IndexMod = rabbit_classic_queue_index_v2,
996-
SegmentSize = IndexMod:next_segment_boundary(0),
998+
IndexMod = index_mod(),
999+
SegmentSize = segment_size(),
9971000
SeqIds = lists:seq(0, SegmentSize - 1),
9981001

9991002
with_empty_test_queue(
@@ -1019,8 +1022,8 @@ v2_delete_segment_file_partially_acked(Config) ->
10191022
?MODULE, v2_delete_segment_file_partially_acked1, [Config]).
10201023

10211024
v2_delete_segment_file_partially_acked1(_Config) ->
1022-
IndexMod = rabbit_classic_queue_index_v2,
1023-
SegmentSize = IndexMod:next_segment_boundary(0),
1025+
IndexMod = index_mod(),
1026+
SegmentSize = segment_size(),
10241027
SeqIds = lists:seq(0, SegmentSize div 2),
10251028
SeqIdsLen = length(SeqIds),
10261029

@@ -1047,8 +1050,8 @@ v2_delete_segment_file_partially_acked_with_holes(Config) ->
10471050
?MODULE, v2_delete_segment_file_partially_acked_with_holes1, [Config]).
10481051

10491052
v2_delete_segment_file_partially_acked_with_holes1(_Config) ->
1050-
IndexMod = rabbit_classic_queue_index_v2,
1051-
SegmentSize = IndexMod:next_segment_boundary(0),
1053+
IndexMod = index_mod(),
1054+
SegmentSize = segment_size(),
10521055
SeqIdsA = lists:seq(0, SegmentSize div 2),
10531056
SeqIdsB = lists:seq(11 + SegmentSize div 2, SegmentSize - 1),
10541057
SeqIdsLen = length(SeqIdsA) + length(SeqIdsB),
@@ -1107,8 +1110,7 @@ bq_queue_recover(Config) ->
11071110
?MODULE, bq_queue_recover1, [Config]).
11081111

11091112
bq_queue_recover1(Config) ->
1110-
IndexMod = index_mod(),
1111-
Count = 2 * IndexMod:next_segment_boundary(0),
1113+
Count = 2 * segment_size(),
11121114
QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
11131115
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
11141116
QName = amqqueue:get_name(Q),
@@ -1164,14 +1166,13 @@ get_queue_sup_pid([], _QueuePid) ->
11641166

11651167
variable_queue_partial_segments_q_tail_thing(Config) ->
11661168
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1167-
?MODULE, variable_queue_partial_segments_q_tail_thing1, [Config]).
1169+
?MODULE, variable_queue_partial_segments_q_tail_thing1, []).
11681170

1169-
variable_queue_partial_segments_q_tail_thing1(Config) ->
1171+
variable_queue_partial_segments_q_tail_thing1() ->
11701172
with_fresh_variable_queue(fun variable_queue_partial_segments_q_tail_thing2/2).
11711173

11721174
variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
1173-
IndexMod = index_mod(),
1174-
SegmentSize = IndexMod:next_segment_boundary(0),
1175+
SegmentSize = segment_size(),
11751176
HalfSegment = SegmentSize div 2,
11761177
OneAndAHalfSegment = SegmentSize + HalfSegment,
11771178
VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
@@ -1194,12 +1195,8 @@ variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
11941195
SegmentSize + HalfSegment + 1, VQ5),
11951196
VQ7 = check_variable_queue_status(
11961197
VQ6,
1197-
%% We only read from q_tail up to the end of the segment, so
1198-
%% after fetching exactly one segment, we should have no
1199-
%% messages in memory.
1200-
[{q_head, 0},
1201-
{q_tail, {q_tail, SegmentSize, HalfSegment + 1, OneAndAHalfSegment + 1}},
1202-
{len, HalfSegment + 1}]),
1198+
%% We can't make assumptions about how many messages are in memory.
1199+
[{len, HalfSegment + 1}]),
12031200
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
12041201
HalfSegment + 1, VQ7),
12051202
{_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
@@ -1209,14 +1206,13 @@ variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
12091206

12101207
variable_queue_all_the_bits_not_covered_elsewhere_A(Config) ->
12111208
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1212-
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, [Config]).
1209+
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, []).
12131210

1214-
variable_queue_all_the_bits_not_covered_elsewhere_A1(Config) ->
1211+
variable_queue_all_the_bits_not_covered_elsewhere_A1() ->
12151212
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2).
12161213

12171214
variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
1218-
IndexMod = index_mod(),
1219-
Count = 2 * IndexMod:next_segment_boundary(0),
1215+
Count = 2 * segment_size(),
12201216
VQ1 = variable_queue_publish(true, Count, VQ0),
12211217
VQ2 = variable_queue_publish(false, Count, VQ1),
12221218
{VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
@@ -1234,9 +1230,9 @@ variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
12341230

12351231
variable_queue_all_the_bits_not_covered_elsewhere_B(Config) ->
12361232
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1237-
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, [Config]).
1233+
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, []).
12381234

1239-
variable_queue_all_the_bits_not_covered_elsewhere_B1(Config) ->
1235+
variable_queue_all_the_bits_not_covered_elsewhere_B1() ->
12401236
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2).
12411237

12421238
variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
@@ -1252,9 +1248,9 @@ variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
12521248

12531249
variable_queue_drop(Config) ->
12541250
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1255-
?MODULE, variable_queue_drop1, [Config]).
1251+
?MODULE, variable_queue_drop1, []).
12561252

1257-
variable_queue_drop1(Config) ->
1253+
variable_queue_drop1() ->
12581254
with_fresh_variable_queue(fun variable_queue_drop2/2).
12591255

12601256
variable_queue_drop2(VQ0, _QName) ->
@@ -1275,9 +1271,9 @@ variable_queue_drop2(VQ0, _QName) ->
12751271

12761272
variable_queue_fold_msg_on_disk(Config) ->
12771273
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1278-
?MODULE, variable_queue_fold_msg_on_disk1, [Config]).
1274+
?MODULE, variable_queue_fold_msg_on_disk1, []).
12791275

1280-
variable_queue_fold_msg_on_disk1(Config) ->
1276+
variable_queue_fold_msg_on_disk1() ->
12811277
with_fresh_variable_queue(fun variable_queue_fold_msg_on_disk2/2).
12821278

12831279
variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
@@ -1289,9 +1285,9 @@ variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
12891285

12901286
variable_queue_dropfetchwhile(Config) ->
12911287
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1292-
?MODULE, variable_queue_dropfetchwhile1, [Config]).
1288+
?MODULE, variable_queue_dropfetchwhile1, []).
12931289

1294-
variable_queue_dropfetchwhile1(Config) ->
1290+
variable_queue_dropfetchwhile1() ->
12951291
with_fresh_variable_queue(fun variable_queue_dropfetchwhile2/2).
12961292

12971293
variable_queue_dropfetchwhile2(VQ0, _QName) ->
@@ -1335,9 +1331,9 @@ variable_queue_dropfetchwhile2(VQ0, _QName) ->
13351331

13361332
variable_queue_dropwhile_restart(Config) ->
13371333
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1338-
?MODULE, variable_queue_dropwhile_restart1, [Config]).
1334+
?MODULE, variable_queue_dropwhile_restart1, []).
13391335

1340-
variable_queue_dropwhile_restart1(Config) ->
1336+
variable_queue_dropwhile_restart1() ->
13411337
with_fresh_variable_queue(fun variable_queue_dropwhile_restart2/2).
13421338

13431339
variable_queue_dropwhile_restart2(VQ0, QName) ->
@@ -1372,9 +1368,9 @@ variable_queue_dropwhile_restart2(VQ0, QName) ->
13721368

13731369
variable_queue_dropwhile_sync_restart(Config) ->
13741370
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1375-
?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).
1371+
?MODULE, variable_queue_dropwhile_sync_restart1, []).
13761372

1377-
variable_queue_dropwhile_sync_restart1(Config) ->
1373+
variable_queue_dropwhile_sync_restart1() ->
13781374
with_fresh_variable_queue(fun variable_queue_dropwhile_sync_restart2/2).
13791375

13801376
variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
@@ -1412,9 +1408,9 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
14121408

14131409
variable_queue_restart_large_seq_id(Config) ->
14141410
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1415-
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
1411+
?MODULE, variable_queue_restart_large_seq_id1, []).
14161412

1417-
variable_queue_restart_large_seq_id1(Config) ->
1413+
variable_queue_restart_large_seq_id1() ->
14181414
with_fresh_variable_queue(fun variable_queue_restart_large_seq_id2/2).
14191415

14201416
variable_queue_restart_large_seq_id2(VQ0, QName) ->
@@ -1449,9 +1445,9 @@ variable_queue_restart_large_seq_id2(VQ0, QName) ->
14491445

14501446
variable_queue_ack_limiting(Config) ->
14511447
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1452-
?MODULE, variable_queue_ack_limiting1, [Config]).
1448+
?MODULE, variable_queue_ack_limiting1, []).
14531449

1454-
variable_queue_ack_limiting1(Config) ->
1450+
variable_queue_ack_limiting1() ->
14551451
with_fresh_variable_queue(fun variable_queue_ack_limiting2/2).
14561452

14571453
variable_queue_ack_limiting2(VQ0, _Config) ->
@@ -1477,9 +1473,9 @@ variable_queue_ack_limiting2(VQ0, _Config) ->
14771473

14781474
variable_queue_purge(Config) ->
14791475
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1480-
?MODULE, variable_queue_purge1, [Config]).
1476+
?MODULE, variable_queue_purge1, []).
14811477

1482-
variable_queue_purge1(Config) ->
1478+
variable_queue_purge1() ->
14831479
with_fresh_variable_queue(fun variable_queue_purge2/2).
14841480

14851481
variable_queue_purge2(VQ0, _Config) ->
@@ -1499,9 +1495,9 @@ variable_queue_purge2(VQ0, _Config) ->
14991495

15001496
variable_queue_requeue(Config) ->
15011497
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1502-
?MODULE, variable_queue_requeue1, [Config]).
1498+
?MODULE, variable_queue_requeue1, []).
15031499

1504-
variable_queue_requeue1(Config) ->
1500+
variable_queue_requeue1() ->
15051501
with_fresh_variable_queue(fun variable_queue_requeue2/2).
15061502

15071503
variable_queue_requeue2(VQ0, _Config) ->
@@ -1525,14 +1521,13 @@ variable_queue_requeue2(VQ0, _Config) ->
15251521
%% requeue from ram_pending_ack into q_head, move to q_tail and then empty queue
15261522
variable_queue_requeue_ram_beta(Config) ->
15271523
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1528-
?MODULE, variable_queue_requeue_ram_beta1, [Config]).
1524+
?MODULE, variable_queue_requeue_ram_beta1, []).
15291525

1530-
variable_queue_requeue_ram_beta1(Config) ->
1526+
variable_queue_requeue_ram_beta1() ->
15311527
with_fresh_variable_queue(fun variable_queue_requeue_ram_beta2/2).
15321528

15331529
variable_queue_requeue_ram_beta2(VQ0, _Config) ->
1534-
IndexMod = index_mod(),
1535-
Count = IndexMod:next_segment_boundary(0)*2 + 2,
1530+
Count = 2 + 2 * segment_size(),
15361531
VQ1 = variable_queue_publish(false, Count, VQ0),
15371532
{VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
15381533
{Back, Front} = lists:split(Count div 2, AcksR),
@@ -1808,8 +1803,7 @@ requeue_one_by_one(Acks, VQ) ->
18081803
%% internal queues. Kept for completeness.
18091804
variable_queue_with_holes(VQ0) ->
18101805
Interval = 2048, %% should match vq:IO_BATCH_SIZE
1811-
IndexMod = index_mod(),
1812-
Count = IndexMod:next_segment_boundary(0)*2 + 2 * Interval,
1806+
Count = 2 * Interval + 2 * segment_size(),
18131807
Seq = lists:seq(1, Count),
18141808
VQ1 = variable_queue_publish(
18151809
false, 1, Count,

0 commit comments

Comments
 (0)