diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 8f2a7391a111..e7b0e10838df 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1064,6 +1064,11 @@ suites = [ "@proper//:erlang_app", ], ), + rabbitmq_integration_suite( + PACKAGE, + name = "unicode_SUITE", + size = "small", + ), ] assert_suites( diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 80cfe0d8f26b..bd00af998fe0 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -71,7 +71,8 @@ queue_name :: rabbit_amqqueue:name(), %% Queue index directory. - dir :: file:filename(), + %% Stored as binary() as opposed to file:filename() to save memory. + dir :: binary(), %% Buffer of all write operations to be performed. %% When the buffer reaches a certain size, we reduce @@ -197,7 +198,7 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) -> ensure_queue_name_stub_file(Name, Dir), #qi{ queue_name = Name, - dir = Dir, + dir = rabbit_file:filename_to_binary(Dir), on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun }. @@ -217,7 +218,7 @@ reset_state(State = #qi{ queue_name = Name, on_sync_msg = OnSyncMsgFun }) -> ?DEBUG("~0p", [State]), delete_and_terminate(State), - init1(Name, Dir, OnSyncFun, OnSyncMsgFun). + init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun, OnSyncMsgFun). -spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), @@ -277,8 +278,9 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms, State} end. -recover_segments(State0 = #qi { queue_name = Name, dir = Dir }, Terms, IsMsgStoreClean, +recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, Terms, IsMsgStoreClean, ContainsCheckFun, OnSyncFun, OnSyncMsgFun, CountersRef, Context) -> + Dir = rabbit_file:binary_to_filename(DirBin), SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir), State = case SegmentFiles of %% No segments found. @@ -479,8 +481,9 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean %% At this point all messages are persistent because transient messages %% were dropped during the v1 index recovery. -recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = Dir }, +recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin }, V1State, CountersRef) -> + Dir = rabbit_file:binary_to_filename(DirBin), %% Use a temporary per-queue store state to store embedded messages. StoreState0 = rabbit_classic_queue_store_v2:init(Name, fun(_, _) -> ok end), %% Go through the v1 index and publish messages to the v2 index. @@ -539,7 +542,8 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir, end, OpenFds), file_handle_cache:release_reservation(), %% Write recovery terms for faster recovery. - rabbit_recovery_terms:store(VHost, filename:basename(Dir), + rabbit_recovery_terms:store(VHost, + filename:basename(rabbit_file:binary_to_filename(Dir)), [{v2_index_state, {?VERSION, Segments}} | Terms]), State#qi{ segments = #{}, fds = #{} }. @@ -555,7 +559,7 @@ delete_and_terminate(State = #qi { dir = Dir, end, OpenFds), file_handle_cache:release_reservation(), %% Erase the data on disk. - ok = erase_index_dir(Dir), + ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)), State#qi{ segments = #{}, fds = #{} }. @@ -1274,7 +1278,8 @@ queue_name_to_dir_name(#resource { kind = queue, rabbit_misc:format("~.36B", [Num]). segment_file(Segment, #qi{ dir = Dir }) -> - filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION). + filename:join(rabbit_file:binary_to_filename(Dir), + integer_to_list(Segment) ++ ?SEGMENT_EXTENSION). highest_continuous_seq_id([SeqId|Tail], EndSeqId) when (1 + SeqId) =:= EndSeqId -> diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 9b76fe731303..bd8dc7937710 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -75,7 +75,8 @@ -record(qs, { %% Store directory - same as the queue index. - dir :: file:filename(), + %% Stored as binary() as opposed to file:filename() to save memory. + dir :: binary(), %% We keep up to one write fd open at any time. %% Because queues are FIFO, writes are mostly sequential @@ -140,7 +141,7 @@ init(#resource{ virtual_host = VHost } = Name, OnSyncFun) -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), Dir = rabbit_classic_queue_index_v2:queue_dir(VHostDir, Name), #qs{ - dir = Dir, + dir = rabbit_file:filename_to_binary(Dir), on_sync = OnSyncFun }. @@ -497,4 +498,5 @@ check_crc32() -> %% Same implementation as rabbit_classic_queue_index_v2:segment_file/2, %% but with a different state record. segment_file(Segment, #qs{ dir = Dir }) -> - filename:join(Dir, integer_to_list(Segment) ++ ?SEGMENT_EXTENSION). + filename:join(rabbit_file:binary_to_filename(Dir), + integer_to_list(Segment) ++ ?SEGMENT_EXTENSION). diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl index 4d10dd888bcf..8115be6923df 100644 --- a/deps/rabbit/src/rabbit_file.erl +++ b/deps/rabbit/src/rabbit_file.erl @@ -16,6 +16,7 @@ -export([lock_file/1]). -export([read_file_info/1]). -export([filename_as_a_directory/1]). +-export([filename_to_binary/1, binary_to_filename/1]). -import(file_handle_cache, [with_handle/1, with_handle/2]). @@ -328,3 +329,23 @@ filename_as_a_directory(FileName) -> _ -> FileName ++ "/" end. + +-spec filename_to_binary(file:filename()) -> + binary(). +filename_to_binary(Name) when is_list(Name) -> + case unicode:characters_to_binary(Name, unicode, file:native_name_encoding()) of + Bin when is_binary(Bin) -> + Bin; + Other -> + erlang:error(Other) + end. + +-spec binary_to_filename(binary()) -> + file:filename(). +binary_to_filename(Bin) when is_binary(Bin) -> + case unicode:characters_to_list(Bin, file:native_name_encoding()) of + Name when is_list(Name) -> + Name; + Other -> + erlang:error(Other) + end. diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index f9542378d85b..c4b1449ac968 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -52,7 +52,7 @@ -record(msstate, { %% store directory - dir, + dir :: file:filename(), %% the module for index ops, %% rabbit_msg_store_ets_index by default index_module, @@ -151,7 +151,8 @@ file_handle_cache :: map(), index_state :: any(), index_module :: atom(), - dir :: file:filename(), + %% Stored as binary() as opposed to file:filename() to save memory. + dir :: binary(), gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), @@ -473,7 +474,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom file_handle_cache = #{}, index_state = IState, index_module = IModule, - dir = Dir, + dir = rabbit_file:filename_to_binary(Dir), gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -1507,11 +1508,17 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC, get_read_handle(FileNum, FHC, Dir) -> case maps:find(FileNum, FHC) of {ok, Hdl} -> {Hdl, FHC}; - error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum), + error -> {ok, Hdl} = open_file(to_filename(Dir), + filenum_to_name(FileNum), ?READ_MODE), {Hdl, maps:put(FileNum, Hdl, FHC)} end. +to_filename(Name) when is_list(Name) -> + Name; +to_filename(Bin) when is_binary(Bin) -> + rabbit_file:binary_to_filename(Bin). + preallocate(Hdl, FileSizeLimit, FinalPos) -> {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), ok = file_handle_cache:truncate(Hdl), diff --git a/deps/rabbit/src/rabbit_msg_store_ets_index.erl b/deps/rabbit/src/rabbit_msg_store_ets_index.erl index 1358660633b2..c2ab8d0932c1 100644 --- a/deps/rabbit/src/rabbit_msg_store_ets_index.erl +++ b/deps/rabbit/src/rabbit_msg_store_ets_index.erl @@ -18,18 +18,21 @@ -define(MSG_LOC_NAME, rabbit_msg_store_ets_index). -define(FILENAME, "msg_store_index.ets"). --record(state, { table, dir }). +-record(state, + {table, + %% Stored as binary() as opposed to file:filename() to save memory. + dir :: binary()}). new(Dir) -> file:delete(filename:join(Dir, ?FILENAME)), Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]), - #state { table = Tid, dir = Dir }. + #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }. recover(Dir) -> Path = filename:join(Dir, ?FILENAME), case ets:file2tab(Path) of {ok, Tid} -> file:delete(Path), - {ok, #state { table = Tid, dir = Dir }}; + {ok, #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }}; Error -> Error end. @@ -64,7 +67,8 @@ clean_up_temporary_reference_count_entries_without_file(State) -> ets:select_delete(State #state.table, [{MatchHead, [], [true]}]), ok. -terminate(#state { table = MsgLocations, dir = Dir }) -> +terminate(#state { table = MsgLocations, dir = DirBin }) -> + Dir = rabbit_file:binary_to_filename(DirBin), case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME), [{extended_info, [object_count]}]) of ok -> ok; diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 59aa6698c737..aacbafbf9930 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -906,7 +906,7 @@ consume(Config) -> _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), ok = amqp_channel:close(Ch1) after 5000 -> - exit(timeout) + ct:fail(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). diff --git a/deps/rabbit/test/unicode_SUITE.erl b/deps/rabbit/test/unicode_SUITE.erl new file mode 100644 index 000000000000..67b4f2ed848f --- /dev/null +++ b/deps/rabbit/test/unicode_SUITE.erl @@ -0,0 +1,122 @@ +-module(unicode_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +%% Unicode U+1F407 +-define(UNICODE_STRING, "bunny🐇bunny"). + +all() -> + [ + {group, queues} + ]. + +groups() -> + [ + {queues, [], [ + classic_queue_v1, + classic_queue_v2, + quorum_queue, + stream + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(Group, Config0) -> + PrivDir0 = ?config(priv_dir, Config0), + PrivDir = filename:join(PrivDir0, ?UNICODE_STRING), + ok = file:make_dir(PrivDir), + Config = rabbit_ct_helpers:set_config(Config0, [{priv_dir, PrivDir}, + {rmq_nodename_suffix, Group}]), + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() + ). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +classic_queue_v1(Config) -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, [rabbit, classic_queue_default_version, 1]), + ok = queue(Config, ?FUNCTION_NAME, []). + +classic_queue_v2(Config) -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, [rabbit, classic_queue_default_version, 2]), + ok = queue(Config, ?FUNCTION_NAME, []). + +quorum_queue(Config) -> + ok = queue(Config, ?FUNCTION_NAME, [{<<"x-queue-type">>, longstr, <<"quorum">>}]). + +queue(Config, QName0, Args) -> + QName1 = rabbit_data_coercion:to_binary(QName0), + QName = <>, + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = Args + }), + rabbit_ct_client_helpers:publish(Ch, QName, 1), + {#'basic.get_ok'{}, #amqp_msg{payload = <<"1">>}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = false}), + {'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + ok. + +stream(Config) -> + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + ConsumerTag = QName0 = atom_to_binary(?FUNCTION_NAME), + QName = <>, + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}] + }), + rabbit_ct_client_helpers:publish(Ch, QName, 1), + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 1})), + amqp_channel:subscribe(Ch, + #'basic.consume'{queue = QName, + no_ack = false, + consumer_tag = ConsumerTag, + arguments = [{<<"x-stream-offset">>, long, 0}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = ConsumerTag} -> + ok + end, + DelTag = receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + DeliveryTag + after 5000 -> + ct:fail(timeout) + end, + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag, + multiple = false}), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = ConsumerTag}), + {'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + ok. diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 43002d34e3e1..644dbf53c090 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -727,12 +727,12 @@ do_start_rabbitmq_node(Config, NodeConfig, I) -> {"RABBITMQ_NODENAME=~s", [Nodename]}, {"RABBITMQ_NODENAME_FOR_PATHS=~s", [InitialNodename]}, {"RABBITMQ_DIST_PORT=~b", [DistPort]}, - {"RABBITMQ_CONFIG_FILE=~s", [ConfigFile]}, + {"RABBITMQ_CONFIG_FILE=~ts", [ConfigFile]}, {"RABBITMQ_SERVER_START_ARGS=~s", [StartArgs1]}, "RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=+S 2 +sbwt very_short +A 24", "RABBITMQ_LOG=debug", "RMQCTL_WAIT_TIMEOUT=180", - {"TEST_TMPDIR=~s", [PrivDir]} + {"TEST_TMPDIR=~ts", [PrivDir]} | ExtraArgs], Cmd = ["start-background-broker" | MakeVars], case rabbit_ct_helpers:get_config(Config, rabbitmq_run_cmd) of