Skip to content

Commit 3089f33

Browse files
Merge pull request #6684 from rabbitmq/cqv2-memory
Reduce memory by multiple GBs with many classic queues
2 parents 4719a14 + b0d0308 commit 3089f33

9 files changed

+187
-21
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,11 @@ suites = [
10641064
"@proper//:erlang_app",
10651065
],
10661066
),
1067+
rabbitmq_integration_suite(
1068+
PACKAGE,
1069+
name = "unicode_SUITE",
1070+
size = "small",
1071+
),
10671072
]
10681073

10691074
assert_suites(

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171
queue_name :: rabbit_amqqueue:name(),
7272

7373
%% Queue index directory.
74-
dir :: file:filename(),
74+
%% Stored as binary() as opposed to file:filename() to save memory.
75+
dir :: binary(),
7576

7677
%% Buffer of all write operations to be performed.
7778
%% When the buffer reaches a certain size, we reduce
@@ -197,7 +198,7 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
197198
ensure_queue_name_stub_file(Name, Dir),
198199
#qi{
199200
queue_name = Name,
200-
dir = Dir,
201+
dir = rabbit_file:filename_to_binary(Dir),
201202
on_sync = OnSyncFun,
202203
on_sync_msg = OnSyncMsgFun
203204
}.
@@ -217,7 +218,7 @@ reset_state(State = #qi{ queue_name = Name,
217218
on_sync_msg = OnSyncMsgFun }) ->
218219
?DEBUG("~0p", [State]),
219220
delete_and_terminate(State),
220-
init1(Name, Dir, OnSyncFun, OnSyncMsgFun).
221+
init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun, OnSyncMsgFun).
221222

222223
-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
223224
contains_predicate(),
@@ -277,8 +278,9 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms,
277278
State}
278279
end.
279280

280-
recover_segments(State0 = #qi { queue_name = Name, dir = Dir }, Terms, IsMsgStoreClean,
281+
recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, Terms, IsMsgStoreClean,
281282
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, CountersRef, Context) ->
283+
Dir = rabbit_file:binary_to_filename(DirBin),
282284
SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir),
283285
State = case SegmentFiles of
284286
%% No segments found.
@@ -479,8 +481,9 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean
479481

480482
%% At this point all messages are persistent because transient messages
481483
%% were dropped during the v1 index recovery.
482-
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = Dir },
484+
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
483485
V1State, CountersRef) ->
486+
Dir = rabbit_file:binary_to_filename(DirBin),
484487
%% Use a temporary per-queue store state to store embedded messages.
485488
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
486489
%% Go through the v1 index and publish messages to the v2 index.
@@ -539,7 +542,8 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
539542
end, OpenFds),
540543
file_handle_cache:release_reservation(),
541544
%% Write recovery terms for faster recovery.
542-
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
545+
rabbit_recovery_terms:store(VHost,
546+
filename:basename(rabbit_file:binary_to_filename(Dir)),
543547
[{v2_index_state, {?VERSION, Segments}} | Terms]),
544548
State#qi{ segments = #{},
545549
fds = #{} }.
@@ -555,7 +559,7 @@ delete_and_terminate(State = #qi { dir = Dir,
555559
end, OpenFds),
556560
file_handle_cache:release_reservation(),
557561
%% Erase the data on disk.
558-
ok = erase_index_dir(Dir),
562+
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
559563
State#qi{ segments = #{},
560564
fds = #{} }.
561565

@@ -1277,7 +1281,8 @@ queue_name_to_dir_name(#resource { kind = queue,
12771281
rabbit_misc:format("~.36B", [Num]).
12781282

12791283
segment_file(Segment, #qi{ dir = Dir }) ->
1280-
filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
1284+
filename:join(rabbit_file:binary_to_filename(Dir),
1285+
integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
12811286

12821287
highest_continuous_seq_id([SeqId|Tail], EndSeqId)
12831288
when (1 + SeqId) =:= EndSeqId ->

deps/rabbit/src/rabbit_classic_queue_store_v2.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676

7777
-record(qs, {
7878
%% Store directory - same as the queue index.
79-
dir :: file:filename(),
79+
%% Stored as binary() as opposed to file:filename() to save memory.
80+
dir :: binary(),
8081

8182
%% We keep track of which segment is open
8283
%% and the current offset in the file. This offset
@@ -117,7 +118,7 @@ init(#resource{ virtual_host = VHost } = Name) ->
117118
?DEBUG("~0p", [Name]),
118119
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
119120
Dir = rabbit_classic_queue_index_v2:queue_dir(VHostDir, Name),
120-
#qs{dir = Dir}.
121+
#qs{dir = rabbit_file:filename_to_binary(Dir)}.
121122

122123
-spec terminate(State) -> State when State::state().
123124

@@ -570,4 +571,5 @@ check_crc32() ->
570571
%% Same implementation as rabbit_classic_queue_index_v2:segment_file/2,
571572
%% but with a different state record.
572573
segment_file(Segment, #qs{ dir = Dir }) ->
573-
filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
574+
filename:join(rabbit_file:binary_to_filename(Dir),
575+
integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).

deps/rabbit/src/rabbit_file.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
-export([lock_file/1]).
1717
-export([read_file_info/1]).
1818
-export([filename_as_a_directory/1]).
19+
-export([filename_to_binary/1, binary_to_filename/1]).
1920

2021
-import(file_handle_cache, [with_handle/1, with_handle/2]).
2122

@@ -328,3 +329,23 @@ filename_as_a_directory(FileName) ->
328329
_ ->
329330
FileName ++ "/"
330331
end.
332+
333+
-spec filename_to_binary(file:filename()) ->
334+
binary().
335+
filename_to_binary(Name) when is_list(Name) ->
336+
case unicode:characters_to_binary(Name, unicode, file:native_name_encoding()) of
337+
Bin when is_binary(Bin) ->
338+
Bin;
339+
Other ->
340+
erlang:error(Other)
341+
end.
342+
343+
-spec binary_to_filename(binary()) ->
344+
file:filename().
345+
binary_to_filename(Bin) when is_binary(Bin) ->
346+
case unicode:characters_to_list(Bin, file:native_name_encoding()) of
347+
Name when is_list(Name) ->
348+
Name;
349+
Other ->
350+
erlang:error(Other)
351+
end.

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
-record(msstate,
5151
{
5252
%% store directory
53-
dir,
53+
dir :: file:filename(),
5454
%% the module for index ops,
5555
%% rabbit_msg_store_ets_index by default
5656
index_module,
@@ -149,7 +149,8 @@
149149
file_handle_cache :: map(),
150150
index_state :: any(),
151151
index_module :: atom(),
152-
dir :: file:filename(),
152+
%% Stored as binary() as opposed to file:filename() to save memory.
153+
dir :: binary(),
153154
gc_pid :: pid(),
154155
file_handles_ets :: ets:tid(),
155156
file_summary_ets :: ets:tid(),
@@ -466,7 +467,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom
466467
file_handle_cache = #{},
467468
index_state = IState,
468469
index_module = IModule,
469-
dir = Dir,
470+
dir = rabbit_file:filename_to_binary(Dir),
470471
gc_pid = GCPid,
471472
file_handles_ets = FileHandlesEts,
472473
file_summary_ets = FileSummaryEts,
@@ -1509,11 +1510,17 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
15091510
get_read_handle(FileNum, FHC, Dir) ->
15101511
case maps:find(FileNum, FHC) of
15111512
{ok, Hdl} -> {Hdl, FHC};
1512-
error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
1513+
error -> {ok, Hdl} = open_file(to_filename(Dir),
1514+
filenum_to_name(FileNum),
15131515
?READ_MODE),
15141516
{Hdl, maps:put(FileNum, Hdl, FHC)}
15151517
end.
15161518

1519+
to_filename(Name) when is_list(Name) ->
1520+
Name;
1521+
to_filename(Bin) when is_binary(Bin) ->
1522+
rabbit_file:binary_to_filename(Bin).
1523+
15171524
preallocate(Hdl, FileSizeLimit, FinalPos) ->
15181525
{ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
15191526
ok = file_handle_cache:truncate(Hdl),

deps/rabbit/src/rabbit_msg_store_ets_index.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
1919
-define(FILENAME, "msg_store_index.ets").
2020

21-
-record(state, { table, dir }).
21+
-record(state,
22+
{table,
23+
%% Stored as binary() as opposed to file:filename() to save memory.
24+
dir :: binary()}).
2225

2326
new(Dir) ->
2427
file:delete(filename:join(Dir, ?FILENAME)),
2528
Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
26-
#state { table = Tid, dir = Dir }.
29+
#state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }.
2730

2831
recover(Dir) ->
2932
Path = filename:join(Dir, ?FILENAME),
3033
case ets:file2tab(Path) of
3134
{ok, Tid} -> file:delete(Path),
32-
{ok, #state { table = Tid, dir = Dir }};
35+
{ok, #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }};
3336
Error -> Error
3437
end.
3538

@@ -64,7 +67,8 @@ clean_up_temporary_reference_count_entries_without_file(State) ->
6467
ets:select_delete(State #state.table, [{MatchHead, [], [true]}]),
6568
ok.
6669

67-
terminate(#state { table = MsgLocations, dir = Dir }) ->
70+
terminate(#state { table = MsgLocations, dir = DirBin }) ->
71+
Dir = rabbit_file:binary_to_filename(DirBin),
6872
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
6973
[{extended_info, [object_count]}]) of
7074
ok -> ok;

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
%% is stored in the per-vhost shared rabbit_msg_store
7070
%%
7171
%% When messages must be read from disk, message bodies will
72-
%% also be read from disk except if the message in stored
72+
%% also be read from disk except if the message is stored
7373
%% in the per-vhost shared rabbit_msg_store. In that case
7474
%% the message gets read before it needs to be sent to the
7575
%% consumer. Messages are read from rabbit_msg_store one

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ consume(Config) ->
911911
_ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
912912
ok = amqp_channel:close(Ch1)
913913
after 5000 ->
914-
exit(timeout)
914+
ct:fail(timeout)
915915
end,
916916
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
917917

deps/rabbit/test/unicode_SUITE.erl

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
-module(unicode_SUITE).
2+
3+
-include_lib("common_test/include/ct.hrl").
4+
-include_lib("eunit/include/eunit.hrl").
5+
-include_lib("amqp_client/include/amqp_client.hrl").
6+
7+
-compile(export_all).
8+
9+
%% Unicode U+1F407
10+
-define(UNICODE_STRING, "bunny🐇bunny").
11+
12+
all() ->
13+
[
14+
{group, queues}
15+
].
16+
17+
groups() ->
18+
[
19+
{queues, [], [
20+
classic_queue_v1,
21+
classic_queue_v2,
22+
quorum_queue,
23+
stream
24+
]}
25+
].
26+
27+
%% -------------------------------------------------------------------
28+
%% Testsuite setup/teardown.
29+
%% -------------------------------------------------------------------
30+
31+
init_per_suite(Config) ->
32+
rabbit_ct_helpers:log_environment(),
33+
rabbit_ct_helpers:run_setup_steps(Config).
34+
35+
end_per_suite(Config) ->
36+
rabbit_ct_helpers:run_teardown_steps(Config).
37+
38+
init_per_group(Group, Config0) ->
39+
PrivDir0 = ?config(priv_dir, Config0),
40+
PrivDir = filename:join(PrivDir0, ?UNICODE_STRING),
41+
ok = file:make_dir(PrivDir),
42+
Config = rabbit_ct_helpers:set_config(Config0, [{priv_dir, PrivDir},
43+
{rmq_nodename_suffix, Group}]),
44+
rabbit_ct_helpers:run_steps(Config,
45+
rabbit_ct_broker_helpers:setup_steps() ++
46+
rabbit_ct_client_helpers:setup_steps()
47+
).
48+
49+
end_per_group(_, Config) ->
50+
rabbit_ct_helpers:run_steps(Config,
51+
rabbit_ct_client_helpers:teardown_steps() ++
52+
rabbit_ct_broker_helpers:teardown_steps()).
53+
54+
init_per_testcase(Testcase, Config) ->
55+
rabbit_ct_helpers:testcase_started(Config, Testcase).
56+
57+
end_per_testcase(Testcase, Config) ->
58+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
59+
60+
classic_queue_v1(Config) ->
61+
ok = rabbit_ct_broker_helpers:rpc(
62+
Config, 0, application, set_env, [rabbit, classic_queue_default_version, 1]),
63+
ok = queue(Config, ?FUNCTION_NAME, []).
64+
65+
classic_queue_v2(Config) ->
66+
ok = rabbit_ct_broker_helpers:rpc(
67+
Config, 0, application, set_env, [rabbit, classic_queue_default_version, 2]),
68+
ok = queue(Config, ?FUNCTION_NAME, []).
69+
70+
quorum_queue(Config) ->
71+
ok = queue(Config, ?FUNCTION_NAME, [{<<"x-queue-type">>, longstr, <<"quorum">>}]).
72+
73+
queue(Config, QName0, Args) ->
74+
QName1 = rabbit_data_coercion:to_binary(QName0),
75+
QName = <<QName1/binary, ?UNICODE_STRING/utf8>>,
76+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
77+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
78+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
79+
durable = true,
80+
arguments = Args
81+
}),
82+
rabbit_ct_client_helpers:publish(Ch, QName, 1),
83+
{#'basic.get_ok'{}, #amqp_msg{payload = <<"1">>}} =
84+
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = false}),
85+
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
86+
ok.
87+
88+
stream(Config) ->
89+
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue),
90+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
91+
ConsumerTag = QName0 = atom_to_binary(?FUNCTION_NAME),
92+
QName = <<QName0/binary, ?UNICODE_STRING/utf8>>,
93+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
94+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
95+
durable = true,
96+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]
97+
}),
98+
rabbit_ct_client_helpers:publish(Ch, QName, 1),
99+
?assertMatch(#'basic.qos_ok'{},
100+
amqp_channel:call(Ch, #'basic.qos'{global = false,
101+
prefetch_count = 1})),
102+
amqp_channel:subscribe(Ch,
103+
#'basic.consume'{queue = QName,
104+
no_ack = false,
105+
consumer_tag = ConsumerTag,
106+
arguments = [{<<"x-stream-offset">>, long, 0}]},
107+
self()),
108+
receive
109+
#'basic.consume_ok'{consumer_tag = ConsumerTag} ->
110+
ok
111+
end,
112+
DelTag = receive
113+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
114+
DeliveryTag
115+
after 5000 ->
116+
ct:fail(timeout)
117+
end,
118+
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag,
119+
multiple = false}),
120+
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = ConsumerTag}),
121+
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
122+
ok.

0 commit comments

Comments
 (0)