Skip to content

Commit 8f0800e

Browse files
ansdlukebakken
authored andcommitted
Make classic queues v2 memory efficient
Store directory names as binary instead of string. This commit saves >1GB of memory per 100,000 classic queues v2. With longish node names, the memory savings are even much higher. This commit is especially a prerequisite for scalalbe MQTT where every subscribing MQTT connection creates its own classic queue. So, with 3 million MQTT subscribers, this commit saves >30 GB of memory. This commits stores file names as binaries and converts back to file:filename() when passed to file API functions. This is to reduce risk of breaking behaviour for path names containing unicode chars on certain platforms. Alternatives to the implementation in this commit: 1. Store common directory list prefix only once (e.g. put it into persistent_term) and store per queue directory names in ETS. 2. Use file:filename_all() instead of file:filename() and pass binaries to the file module functions. However this might be brittle on some platforms since these binaries are interpreted as "raw filenames". Using raw filenames requires more changes to classic queues which we want to avoid to reduce risk. The downside of the implemenation in this commit is that the binary gets converted to a list sometimes. This happens whenever a file is flushed or a new file gets created for example. Following perf tests did not show any regression in performance: ``` java -jar target/perf-test.jar -s 10 -x 1 -y 0 -u q -f persistent -z 30 java -jar target/perf-test.jar -s 10000 -x 1 -y 0 -u q -f persistent -z 30 java -jar target/perf-test.jar -s 10 -x 100 -qp q%d -qpf 1 -qpt 100 -y 0 -f persistent -z 60 -c 1000 ``` Furthermore `rabbit_file` did not show up in the CPU flame graphs either.
1 parent 5a8530c commit 8f0800e

9 files changed

+183
-21
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,11 @@ suites = [
10641064
"@proper//:erlang_app",
10651065
],
10661066
),
1067+
rabbitmq_integration_suite(
1068+
PACKAGE,
1069+
name = "unicode_SUITE",
1070+
size = "small",
1071+
),
10671072
]
10681073

10691074
assert_suites(

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171
queue_name :: rabbit_amqqueue:name(),
7272

7373
%% Queue index directory.
74-
dir :: file:filename(),
74+
%% Stored as binary() as opposed to file:filename() to save memory.
75+
dir :: binary(),
7576

7677
%% Buffer of all write operations to be performed.
7778
%% When the buffer reaches a certain size, we reduce
@@ -197,7 +198,7 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
197198
ensure_queue_name_stub_file(Name, Dir),
198199
#qi{
199200
queue_name = Name,
200-
dir = Dir,
201+
dir = rabbit_file:filename_to_binary(Dir),
201202
on_sync = OnSyncFun,
202203
on_sync_msg = OnSyncMsgFun
203204
}.
@@ -217,7 +218,7 @@ reset_state(State = #qi{ queue_name = Name,
217218
on_sync_msg = OnSyncMsgFun }) ->
218219
?DEBUG("~0p", [State]),
219220
delete_and_terminate(State),
220-
init1(Name, Dir, OnSyncFun, OnSyncMsgFun).
221+
init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun, OnSyncMsgFun).
221222

222223
-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
223224
contains_predicate(),
@@ -277,8 +278,9 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms,
277278
State}
278279
end.
279280

280-
recover_segments(State0 = #qi { queue_name = Name, dir = Dir }, Terms, IsMsgStoreClean,
281+
recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, Terms, IsMsgStoreClean,
281282
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, CountersRef, Context) ->
283+
Dir = rabbit_file:binary_to_filename(DirBin),
282284
SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir),
283285
State = case SegmentFiles of
284286
%% No segments found.
@@ -479,8 +481,9 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean
479481

480482
%% At this point all messages are persistent because transient messages
481483
%% were dropped during the v1 index recovery.
482-
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = Dir },
484+
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
483485
V1State, CountersRef) ->
486+
Dir = rabbit_file:binary_to_filename(DirBin),
484487
%% Use a temporary per-queue store state to store embedded messages.
485488
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
486489
%% Go through the v1 index and publish messages to the v2 index.
@@ -539,7 +542,8 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
539542
end, OpenFds),
540543
file_handle_cache:release_reservation(),
541544
%% Write recovery terms for faster recovery.
542-
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
545+
rabbit_recovery_terms:store(VHost,
546+
filename:basename(rabbit_file:binary_to_filename(Dir)),
543547
[{v2_index_state, {?VERSION, Segments}} | Terms]),
544548
State#qi{ segments = #{},
545549
fds = #{} }.
@@ -555,7 +559,7 @@ delete_and_terminate(State = #qi { dir = Dir,
555559
end, OpenFds),
556560
file_handle_cache:release_reservation(),
557561
%% Erase the data on disk.
558-
ok = erase_index_dir(Dir),
562+
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
559563
State#qi{ segments = #{},
560564
fds = #{} }.
561565

@@ -1277,7 +1281,8 @@ queue_name_to_dir_name(#resource { kind = queue,
12771281
rabbit_misc:format("~.36B", [Num]).
12781282

12791283
segment_file(Segment, #qi{ dir = Dir }) ->
1280-
filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
1284+
filename:join(rabbit_file:binary_to_filename(Dir),
1285+
integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
12811286

12821287
highest_continuous_seq_id([SeqId|Tail], EndSeqId)
12831288
when (1 + SeqId) =:= EndSeqId ->

deps/rabbit/src/rabbit_classic_queue_store_v2.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676

7777
-record(qs, {
7878
%% Store directory - same as the queue index.
79-
dir :: file:filename(),
79+
%% Stored as binary() as opposed to file:filename() to save memory.
80+
dir :: binary(),
8081

8182
%% We keep track of which segment is open
8283
%% and the current offset in the file. This offset
@@ -117,7 +118,7 @@ init(#resource{ virtual_host = VHost } = Name) ->
117118
?DEBUG("~0p", [Name]),
118119
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
119120
Dir = rabbit_classic_queue_index_v2:queue_dir(VHostDir, Name),
120-
#qs{dir = Dir}.
121+
#qs{dir = rabbit_file:filename_to_binary(Dir)}.
121122

122123
-spec terminate(State) -> State when State::state().
123124

@@ -570,4 +571,5 @@ check_crc32() ->
570571
%% Same implementation as rabbit_classic_queue_index_v2:segment_file/2,
571572
%% but with a different state record.
572573
segment_file(Segment, #qs{ dir = Dir }) ->
573-
filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
574+
filename:join(rabbit_file:binary_to_filename(Dir),
575+
integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).

deps/rabbit/src/rabbit_file.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
-export([lock_file/1]).
1717
-export([read_file_info/1]).
1818
-export([filename_as_a_directory/1]).
19+
-export([filename_to_binary/1, binary_to_filename/1]).
1920

2021
-import(file_handle_cache, [with_handle/1, with_handle/2]).
2122

@@ -328,3 +329,23 @@ filename_as_a_directory(FileName) ->
328329
_ ->
329330
FileName ++ "/"
330331
end.
332+
333+
-spec filename_to_binary(file:filename()) ->
334+
binary().
335+
filename_to_binary(Name) when is_list(Name) ->
336+
case unicode:characters_to_binary(Name, unicode, file:native_name_encoding()) of
337+
Bin when is_binary(Bin) ->
338+
Bin;
339+
Other ->
340+
erlang:error(Other)
341+
end.
342+
343+
-spec binary_to_filename(binary()) ->
344+
file:filename().
345+
binary_to_filename(Bin) when is_binary(Bin) ->
346+
case unicode:characters_to_list(Bin, file:native_name_encoding()) of
347+
Name when is_list(Name) ->
348+
Name;
349+
Other ->
350+
erlang:error(Other)
351+
end.

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
-record(msstate,
5151
{
5252
%% store directory
53-
dir,
53+
dir :: file:filename(),
5454
%% the module for index ops,
5555
%% rabbit_msg_store_ets_index by default
5656
index_module,
@@ -149,7 +149,8 @@
149149
file_handle_cache :: map(),
150150
index_state :: any(),
151151
index_module :: atom(),
152-
dir :: file:filename(),
152+
%% Stored as binary() as opposed to file:filename() to save memory.
153+
dir :: binary(),
153154
gc_pid :: pid(),
154155
file_handles_ets :: ets:tid(),
155156
file_summary_ets :: ets:tid(),
@@ -466,7 +467,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom
466467
file_handle_cache = #{},
467468
index_state = IState,
468469
index_module = IModule,
469-
dir = Dir,
470+
dir = rabbit_file:filename_to_binary(Dir),
470471
gc_pid = GCPid,
471472
file_handles_ets = FileHandlesEts,
472473
file_summary_ets = FileSummaryEts,
@@ -1509,11 +1510,17 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
15091510
get_read_handle(FileNum, FHC, Dir) ->
15101511
case maps:find(FileNum, FHC) of
15111512
{ok, Hdl} -> {Hdl, FHC};
1512-
error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
1513+
error -> {ok, Hdl} = open_file(to_filename(Dir),
1514+
filenum_to_name(FileNum),
15131515
?READ_MODE),
15141516
{Hdl, maps:put(FileNum, Hdl, FHC)}
15151517
end.
15161518

1519+
to_filename(Name) when is_list(Name) ->
1520+
Name;
1521+
to_filename(Bin) when is_binary(Bin) ->
1522+
rabbit_file:binary_to_filename(Bin).
1523+
15171524
preallocate(Hdl, FileSizeLimit, FinalPos) ->
15181525
{ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
15191526
ok = file_handle_cache:truncate(Hdl),

deps/rabbit/src/rabbit_msg_store_ets_index.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
1919
-define(FILENAME, "msg_store_index.ets").
2020

21-
-record(state, { table, dir }).
21+
-record(state,
22+
{table,
23+
%% Stored as binary() as opposed to file:filename() to save memory.
24+
dir :: binary()}).
2225

2326
new(Dir) ->
2427
file:delete(filename:join(Dir, ?FILENAME)),
2528
Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
26-
#state { table = Tid, dir = Dir }.
29+
#state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }.
2730

2831
recover(Dir) ->
2932
Path = filename:join(Dir, ?FILENAME),
3033
case ets:file2tab(Path) of
3134
{ok, Tid} -> file:delete(Path),
32-
{ok, #state { table = Tid, dir = Dir }};
35+
{ok, #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }};
3336
Error -> Error
3437
end.
3538

@@ -64,7 +67,8 @@ clean_up_temporary_reference_count_entries_without_file(State) ->
6467
ets:select_delete(State #state.table, [{MatchHead, [], [true]}]),
6568
ok.
6669

67-
terminate(#state { table = MsgLocations, dir = Dir }) ->
70+
terminate(#state { table = MsgLocations, dir = DirBin }) ->
71+
Dir = rabbit_file:binary_to_filename(DirBin),
6872
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
6973
[{extended_info, [object_count]}]) of
7074
ok -> ok;

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
%% is stored in the per-vhost shared rabbit_msg_store
7070
%%
7171
%% When messages must be read from disk, message bodies will
72-
%% also be read from disk except if the message in stored
72+
%% also be read from disk except if the message is stored
7373
%% in the per-vhost shared rabbit_msg_store. In that case
7474
%% the message gets read before it needs to be sent to the
7575
%% consumer. Messages are read from rabbit_msg_store one

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ consume(Config) ->
911911
_ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
912912
ok = amqp_channel:close(Ch1)
913913
after 5000 ->
914-
exit(timeout)
914+
ct:fail(timeout)
915915
end,
916916
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
917917

deps/rabbit/test/unicode_SUITE.erl

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
-module(unicode_SUITE).
2+
3+
-include_lib("common_test/include/ct.hrl").
4+
-include_lib("eunit/include/eunit.hrl").
5+
-include_lib("amqp_client/include/amqp_client.hrl").
6+
7+
-compile(export_all).
8+
9+
all() ->
10+
[
11+
{group, queues}
12+
].
13+
14+
groups() ->
15+
[
16+
{queues, [], [
17+
classic_queue_v1,
18+
classic_queue_v2,
19+
quorum_queue,
20+
stream
21+
]}
22+
].
23+
24+
%% -------------------------------------------------------------------
25+
%% Testsuite setup/teardown.
26+
%% -------------------------------------------------------------------
27+
28+
init_per_suite(Config) ->
29+
rabbit_ct_helpers:log_environment(),
30+
rabbit_ct_helpers:run_setup_steps(Config).
31+
32+
end_per_suite(Config) ->
33+
rabbit_ct_helpers:run_teardown_steps(Config).
34+
35+
init_per_group(Group, Config0) ->
36+
PrivDir0 = ?config(priv_dir, Config0),
37+
%% Put unicode char U+1F407 in directory name
38+
PrivDir = filename:join(PrivDir0, "bunny🐇bunny"),
39+
ok = file:make_dir(PrivDir),
40+
Config = rabbit_ct_helpers:set_config(Config0, [{priv_dir, PrivDir},
41+
{rmq_nodename_suffix, Group}]),
42+
rabbit_ct_helpers:run_steps(Config,
43+
rabbit_ct_broker_helpers:setup_steps() ++
44+
rabbit_ct_client_helpers:setup_steps()
45+
).
46+
47+
end_per_group(_, Config) ->
48+
rabbit_ct_helpers:run_steps(Config,
49+
rabbit_ct_client_helpers:teardown_steps() ++
50+
rabbit_ct_broker_helpers:teardown_steps()).
51+
52+
init_per_testcase(Testcase, Config) ->
53+
rabbit_ct_helpers:testcase_started(Config, Testcase).
54+
55+
end_per_testcase(Testcase, Config) ->
56+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
57+
58+
classic_queue_v1(Config) ->
59+
ok = rabbit_ct_broker_helpers:rpc(
60+
Config, 0, application, set_env, [rabbit, classic_queue_default_version, 1]),
61+
ok = queue(Config, ?FUNCTION_NAME, []).
62+
63+
classic_queue_v2(Config) ->
64+
ok = rabbit_ct_broker_helpers:rpc(
65+
Config, 0, application, set_env, [rabbit, classic_queue_default_version, 2]),
66+
ok = queue(Config, ?FUNCTION_NAME, []).
67+
68+
quorum_queue(Config) ->
69+
ok = queue(Config, ?FUNCTION_NAME, [{<<"x-queue-type">>, longstr, <<"quorum">>}]).
70+
71+
queue(Config, QName0, Args) ->
72+
QName = rabbit_data_coercion:to_binary(QName0),
73+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
74+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
75+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
76+
durable = true,
77+
arguments = Args
78+
}),
79+
rabbit_ct_client_helpers:publish(Ch, QName, 1),
80+
{#'basic.get_ok'{}, #amqp_msg{payload = <<"1">>}} =
81+
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = false}),
82+
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
83+
ok.
84+
85+
stream(Config) ->
86+
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue),
87+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
88+
ConsumerTag = QName = atom_to_binary(?FUNCTION_NAME),
89+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
90+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
91+
durable = true,
92+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]
93+
}),
94+
rabbit_ct_client_helpers:publish(Ch, QName, 1),
95+
?assertMatch(#'basic.qos_ok'{},
96+
amqp_channel:call(Ch, #'basic.qos'{global = false,
97+
prefetch_count = 1})),
98+
amqp_channel:subscribe(Ch,
99+
#'basic.consume'{queue = QName,
100+
no_ack = false,
101+
consumer_tag = ConsumerTag,
102+
arguments = [{<<"x-stream-offset">>, long, 0}]},
103+
self()),
104+
receive
105+
#'basic.consume_ok'{consumer_tag = ConsumerTag} ->
106+
ok
107+
end,
108+
DelTag = receive
109+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
110+
DeliveryTag
111+
after 5000 ->
112+
ct:fail(timeout)
113+
end,
114+
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag,
115+
multiple = false}),
116+
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = ConsumerTag}),
117+
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
118+
ok.

0 commit comments

Comments
 (0)