Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,11 @@ suites = [
"@proper//:erlang_app",
],
),
rabbitmq_integration_suite(
PACKAGE,
name = "unicode_SUITE",
size = "small",
),
]

assert_suites(
Expand Down
21 changes: 13 additions & 8 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
queue_name :: rabbit_amqqueue:name(),

%% Queue index directory.
dir :: file:filename(),
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary(),

%% Buffer of all write operations to be performed.
%% When the buffer reaches a certain size, we reduce
Expand Down Expand Up @@ -197,7 +198,7 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
ensure_queue_name_stub_file(Name, Dir),
#qi{
queue_name = Name,
dir = Dir,
dir = rabbit_file:filename_to_binary(Dir),
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun
}.
Expand All @@ -217,7 +218,7 @@ reset_state(State = #qi{ queue_name = Name,
on_sync_msg = OnSyncMsgFun }) ->
?DEBUG("~0p", [State]),
delete_and_terminate(State),
init1(Name, Dir, OnSyncFun, OnSyncMsgFun).
init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun, OnSyncMsgFun).

-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(),
Expand Down Expand Up @@ -277,8 +278,9 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms,
State}
end.

recover_segments(State0 = #qi { queue_name = Name, dir = Dir }, Terms, IsMsgStoreClean,
recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, Terms, IsMsgStoreClean,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, CountersRef, Context) ->
Dir = rabbit_file:binary_to_filename(DirBin),
SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir),
State = case SegmentFiles of
%% No segments found.
Expand Down Expand Up @@ -479,8 +481,9 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean

%% At this point all messages are persistent because transient messages
%% were dropped during the v1 index recovery.
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = Dir },
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
V1State, CountersRef) ->
Dir = rabbit_file:binary_to_filename(DirBin),
%% Use a temporary per-queue store state to store embedded messages.
StoreState0 = rabbit_classic_queue_store_v2:init(Name, fun(_, _) -> ok end),
%% Go through the v1 index and publish messages to the v2 index.
Expand Down Expand Up @@ -539,7 +542,8 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
end, OpenFds),
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
rabbit_recovery_terms:store(VHost,
filename:basename(rabbit_file:binary_to_filename(Dir)),
[{v2_index_state, {?VERSION, Segments}} | Terms]),
State#qi{ segments = #{},
fds = #{} }.
Expand All @@ -555,7 +559,7 @@ delete_and_terminate(State = #qi { dir = Dir,
end, OpenFds),
file_handle_cache:release_reservation(),
%% Erase the data on disk.
ok = erase_index_dir(Dir),
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
State#qi{ segments = #{},
fds = #{} }.

Expand Down Expand Up @@ -1274,7 +1278,8 @@ queue_name_to_dir_name(#resource { kind = queue,
rabbit_misc:format("~.36B", [Num]).

segment_file(Segment, #qi{ dir = Dir }) ->
filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
filename:join(rabbit_file:binary_to_filename(Dir),
integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).

highest_continuous_seq_id([SeqId|Tail], EndSeqId)
when (1 + SeqId) =:= EndSeqId ->
Expand Down
8 changes: 5 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue_store_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@

-record(qs, {
%% Store directory - same as the queue index.
dir :: file:filename(),
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary(),

%% We keep up to one write fd open at any time.
%% Because queues are FIFO, writes are mostly sequential
Expand Down Expand Up @@ -140,7 +141,7 @@ init(#resource{ virtual_host = VHost } = Name, OnSyncFun) ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
Dir = rabbit_classic_queue_index_v2:queue_dir(VHostDir, Name),
#qs{
dir = Dir,
dir = rabbit_file:filename_to_binary(Dir),
on_sync = OnSyncFun
}.

Expand Down Expand Up @@ -497,4 +498,5 @@ check_crc32() ->
%% Same implementation as rabbit_classic_queue_index_v2:segment_file/2,
%% but with a different state record.
segment_file(Segment, #qs{ dir = Dir }) ->
filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
filename:join(rabbit_file:binary_to_filename(Dir),
integer_to_list(Segment) ++ ?SEGMENT_EXTENSION).
21 changes: 21 additions & 0 deletions deps/rabbit/src/rabbit_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
-export([lock_file/1]).
-export([read_file_info/1]).
-export([filename_as_a_directory/1]).
-export([filename_to_binary/1, binary_to_filename/1]).

-import(file_handle_cache, [with_handle/1, with_handle/2]).

Expand Down Expand Up @@ -328,3 +329,23 @@ filename_as_a_directory(FileName) ->
_ ->
FileName ++ "/"
end.

-spec filename_to_binary(file:filename()) ->
binary().
filename_to_binary(Name) when is_list(Name) ->
case unicode:characters_to_binary(Name, unicode, file:native_name_encoding()) of
Bin when is_binary(Bin) ->
Bin;
Other ->
erlang:error(Other)
end.

-spec binary_to_filename(binary()) ->
file:filename().
binary_to_filename(Bin) when is_binary(Bin) ->
case unicode:characters_to_list(Bin, file:native_name_encoding()) of
Name when is_list(Name) ->
Name;
Other ->
erlang:error(Other)
end.
15 changes: 11 additions & 4 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
-record(msstate,
{
%% store directory
dir,
dir :: file:filename(),
%% the module for index ops,
%% rabbit_msg_store_ets_index by default
index_module,
Expand Down Expand Up @@ -151,7 +151,8 @@
file_handle_cache :: map(),
index_state :: any(),
index_module :: atom(),
dir :: file:filename(),
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary(),
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
Expand Down Expand Up @@ -473,7 +474,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom
file_handle_cache = #{},
index_state = IState,
index_module = IModule,
dir = Dir,
dir = rabbit_file:filename_to_binary(Dir),
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
Expand Down Expand Up @@ -1507,11 +1508,17 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
get_read_handle(FileNum, FHC, Dir) ->
case maps:find(FileNum, FHC) of
{ok, Hdl} -> {Hdl, FHC};
error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
error -> {ok, Hdl} = open_file(to_filename(Dir),
filenum_to_name(FileNum),
?READ_MODE),
{Hdl, maps:put(FileNum, Hdl, FHC)}
end.

to_filename(Name) when is_list(Name) ->
Name;
to_filename(Bin) when is_binary(Bin) ->
rabbit_file:binary_to_filename(Bin).

preallocate(Hdl, FileSizeLimit, FinalPos) ->
{ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
ok = file_handle_cache:truncate(Hdl),
Expand Down
12 changes: 8 additions & 4 deletions deps/rabbit/src/rabbit_msg_store_ets_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
-define(FILENAME, "msg_store_index.ets").

-record(state, { table, dir }).
-record(state,
{table,
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary()}).

new(Dir) ->
file:delete(filename:join(Dir, ?FILENAME)),
Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
#state { table = Tid, dir = Dir }.
#state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }.

recover(Dir) ->
Path = filename:join(Dir, ?FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> file:delete(Path),
{ok, #state { table = Tid, dir = Dir }};
{ok, #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }};
Error -> Error
end.

Expand Down Expand Up @@ -64,7 +67,8 @@ clean_up_temporary_reference_count_entries_without_file(State) ->
ets:select_delete(State #state.table, [{MatchHead, [], [true]}]),
ok.

terminate(#state { table = MsgLocations, dir = Dir }) ->
terminate(#state { table = MsgLocations, dir = DirBin }) ->
Dir = rabbit_file:binary_to_filename(DirBin),
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]) of
ok -> ok;
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ consume(Config) ->
_ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
ok = amqp_channel:close(Ch1)
after 5000 ->
exit(timeout)
ct:fail(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

Expand Down
122 changes: 122 additions & 0 deletions deps/rabbit/test/unicode_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
-module(unicode_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile(export_all).

%% Unicode U+1F407
-define(UNICODE_STRING, "bunny🐇bunny").

all() ->
[
{group, queues}
].

groups() ->
[
{queues, [], [
classic_queue_v1,
classic_queue_v2,
quorum_queue,
stream
]}
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(Group, Config0) ->
PrivDir0 = ?config(priv_dir, Config0),
PrivDir = filename:join(PrivDir0, ?UNICODE_STRING),
ok = file:make_dir(PrivDir),
Config = rabbit_ct_helpers:set_config(Config0, [{priv_dir, PrivDir},
{rmq_nodename_suffix, Group}]),
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()
).

end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

classic_queue_v1(Config) ->
ok = rabbit_ct_broker_helpers:rpc(
Config, 0, application, set_env, [rabbit, classic_queue_default_version, 1]),
ok = queue(Config, ?FUNCTION_NAME, []).

classic_queue_v2(Config) ->
ok = rabbit_ct_broker_helpers:rpc(
Config, 0, application, set_env, [rabbit, classic_queue_default_version, 2]),
ok = queue(Config, ?FUNCTION_NAME, []).

quorum_queue(Config) ->
ok = queue(Config, ?FUNCTION_NAME, [{<<"x-queue-type">>, longstr, <<"quorum">>}]).

queue(Config, QName0, Args) ->
QName1 = rabbit_data_coercion:to_binary(QName0),
QName = <<QName1/binary, ?UNICODE_STRING/utf8>>,
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = Args
}),
rabbit_ct_client_helpers:publish(Ch, QName, 1),
{#'basic.get_ok'{}, #amqp_msg{payload = <<"1">>}} =
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = false}),
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok.

stream(Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
ConsumerTag = QName0 = atom_to_binary(?FUNCTION_NAME),
QName = <<QName0/binary, ?UNICODE_STRING/utf8>>,
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]
}),
rabbit_ct_client_helpers:publish(Ch, QName, 1),
?assertMatch(#'basic.qos_ok'{},
amqp_channel:call(Ch, #'basic.qos'{global = false,
prefetch_count = 1})),
amqp_channel:subscribe(Ch,
#'basic.consume'{queue = QName,
no_ack = false,
consumer_tag = ConsumerTag,
arguments = [{<<"x-stream-offset">>, long, 0}]},
self()),
receive
#'basic.consume_ok'{consumer_tag = ConsumerTag} ->
ok
end,
DelTag = receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
DeliveryTag
after 5000 ->
ct:fail(timeout)
end,
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag,
multiple = false}),
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = ConsumerTag}),
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok.
4 changes: 2 additions & 2 deletions deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -727,12 +727,12 @@ do_start_rabbitmq_node(Config, NodeConfig, I) ->
{"RABBITMQ_NODENAME=~s", [Nodename]},
{"RABBITMQ_NODENAME_FOR_PATHS=~s", [InitialNodename]},
{"RABBITMQ_DIST_PORT=~b", [DistPort]},
{"RABBITMQ_CONFIG_FILE=~s", [ConfigFile]},
{"RABBITMQ_CONFIG_FILE=~ts", [ConfigFile]},
{"RABBITMQ_SERVER_START_ARGS=~s", [StartArgs1]},
"RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=+S 2 +sbwt very_short +A 24",
"RABBITMQ_LOG=debug",
"RMQCTL_WAIT_TIMEOUT=180",
{"TEST_TMPDIR=~s", [PrivDir]}
{"TEST_TMPDIR=~ts", [PrivDir]}
| ExtraArgs],
Cmd = ["start-background-broker" | MakeVars],
case rabbit_ct_helpers:get_config(Config, rabbitmq_run_cmd) of
Expand Down