diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 3eda8b4a4e85..7432ed8694ae 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqqueue.erl", + "src/rabbit_amqqueue_control.erl", "src/rabbit_amqqueue_process.erl", "src/rabbit_amqqueue_sup.erl", "src/rabbit_amqqueue_sup_sup.erl", @@ -297,6 +298,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqqueue.erl", + "src/rabbit_amqqueue_control.erl", "src/rabbit_amqqueue_process.erl", "src/rabbit_amqqueue_sup.erl", "src/rabbit_amqqueue_sup_sup.erl", @@ -559,6 +561,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqqueue.erl", + "src/rabbit_amqqueue_control.erl", "src/rabbit_amqqueue_process.erl", "src/rabbit_amqqueue_sup.erl", "src/rabbit_amqqueue_sup_sup.erl", diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 6ffff0edae19..f2df0d8695e9 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -53,8 +53,9 @@ -export([delete_crashed/1, delete_crashed/2, delete_crashed_internal/2]). - +-export([delete_with/4, delete_with/6]). -export([pid_of/1, pid_of/2]). +-export([pid_or_crashed/2]). -export([mark_local_durable_queues_stopped/1]). -export([rebalance/3]). @@ -71,6 +72,8 @@ -export([prepend_extra_bcc/1]). -export([queue/1, queue_names/1]). +-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -116,6 +119,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, active, activity_status, arguments]). +-define(KILL_QUEUE_DELAY_INTERVAL, 100). warn_file_limit() -> DurableQueues = find_recoverable_queues(), @@ -1601,6 +1605,51 @@ delete_immediately_by_resource(Resources) -> delete(Q, IfUnused, IfEmpty, ActingUser) -> rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser). +-spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) -> + rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit(). +delete_with(QueueName, IfUnused, IfEmpty, ActingUser) -> + delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false). + +-spec delete_with(amqqueue:amqqueue() | name(), pid() | undefined, boolean(), boolean(), rabbit_types:username(), boolean()) -> + rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit(). +delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) -> + QueueName = amqqueue:get_name(AMQQueue), + delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive); +delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when is_record(QueueName, resource) -> + case with( + QueueName, + fun (Q) -> + if CheckExclusive -> + check_exclusive_access(Q, ConnPid); + true -> + ok + end, + rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username) + end, + fun (not_found) -> + {ok, 0}; + ({absent, Q, crashed}) -> + _ = delete_crashed(Q, Username), + {ok, 0}; + ({absent, Q, stopped}) -> + _ = delete_crashed(Q, Username), + {ok, 0}; + ({absent, Q, Reason}) -> + absent(Q, Reason) + end) of + {error, in_use} -> + rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]); + {error, not_empty} -> + rabbit_misc:precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]); + {error, {exit, _, _}} -> + %% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}} + {ok, 0}; + {ok, Count} -> + {ok, Count}; + {protocol_error, Type, Reason, ReasonArgs} -> + rabbit_misc:protocol_error(Type, Reason, ReasonArgs) + end. + %% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS delete_crashed(Q) when ?amqqueue_is_classic(Q) -> rabbit_classic_queue:delete_crashed(Q). @@ -2061,3 +2110,61 @@ is_queue_args_combination_permitted(Durable, Exclusive) -> true -> rabbit_deprecated_features:is_permitted(transient_nonexcl_queues) end. + +-spec kill_queue_hard(node(), name()) -> ok. +kill_queue_hard(Node, QRes = #resource{kind = queue}) -> + kill_queue_hard(Node, QRes, boom). + +-spec kill_queue_hard(node(), name(), atom()) -> ok. +kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) -> + case kill_queue(Node, QRes, Reason) of + crashed -> ok; + stopped -> ok; + NewPid when is_pid(NewPid) -> + timer:sleep(?KILL_QUEUE_DELAY_INTERVAL), + kill_queue_hard(Node, QRes, Reason); + Error -> Error + end. + +-spec kill_queue(node(), name()) -> pid() | crashed | stopped | rabbit_types:error(term()). +kill_queue(Node, QRes = #resource{kind = queue}) -> + kill_queue(Node, QRes, boom). + +-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped | rabbit_types:error(term()). +kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) -> + Pid1 = pid_or_crashed(Node, QRes), + exit(Pid1, Reason), + rabbit_amqqueue_control:await_state(Node, QRes, stopped), + stopped; +kill_queue(Node, QRes = #resource{kind = queue}, Reason) -> + case pid_or_crashed(Node, QRes) of + Pid1 when is_pid(Pid1) -> + exit(Pid1, Reason), + rabbit_amqqueue_control:await_new_pid(Node, QRes, Pid1); + crashed -> + crashed; + Error -> + Error + end. + +-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()). +pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) -> + case rpc:call(Node, rabbit_amqqueue, lookup, [QRes]) of + {ok, Q} -> + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + case State of + crashed -> + case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of + {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; + {ok, SPid} -> + case rabbit_misc:remote_sup_child(Node, SPid) of + {ok, _} -> QPid; %% restarting + {error, no_child} -> crashed %% given up + end + end; + _ -> QPid + end; + Error = {error, _} -> Error; + Reason -> {error, Reason} + end. diff --git a/deps/rabbit/src/rabbit_amqqueue_control.erl b/deps/rabbit/src/rabbit_amqqueue_control.erl new file mode 100644 index 000000000000..d38e878d85bb --- /dev/null +++ b/deps/rabbit/src/rabbit_amqqueue_control.erl @@ -0,0 +1,57 @@ +% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_amqqueue_control). + +-export([await_new_pid/3, await_state/3, await_state/4]). + +-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000). +-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10). +-define(AWAIT_STATE_DELAY_INTERVAL, 100). +-define(AWAIT_STATE_DELAY_TIME_DELTA, 100). + +-include_lib("rabbit_common/include/resource.hrl"). + +-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid(). +await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) -> + case rabbit_amqqueue:pid_or_crashed(Node, QRes) of + OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL), + await_new_pid(Node, QRes, OldPid); + New -> New + end. + +-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'. +await_state(Node, QName, State) when is_binary(QName) -> + QRes = rabbit_misc:r(<<"/">>, queue, QName), + await_state(Node, QRes, State); +await_state(Node, QRes = #resource{kind = queue}, State) -> + await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT). + +-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'. +await_state(Node, QName, State, Time) when is_binary(QName) -> + QRes = rabbit_misc:r(<<"/">>, queue, QName), + await_state(Node, QRes, State, Time); +await_state(Node, QRes = #resource{kind = queue}, State, Time) -> + case state(Node, QRes) of + State -> + ok; + Other -> + case Time of + 0 -> exit({timeout_awaiting_state, State, Other}); + _ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL), + await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA) + end + end. + +state(Node, QRes = #resource{virtual_host = VHost, kind = queue}) -> + Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]), + fetch_state(QRes, Infos). + +fetch_state(_QRes, []) -> undefined; +fetch_state(QRes, [[{name, QRes}, {state, State}] | _]) -> State; +fetch_state(QRes, [[{name, _}, {state, _State}] | Rem]) -> + fetch_state(QRes, Rem). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index fa97da627527..fa85e1d2268a 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -930,15 +930,6 @@ handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, {stop, normal, State1} end. --spec precondition_failed(string()) -> no_return(). - -precondition_failed(Format) -> precondition_failed(Format, []). - --spec precondition_failed(string(), [any()]) -> no_return(). - -precondition_failed(Format, Params) -> - rabbit_misc:protocol_error(precondition_failed, Format, Params). - return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, #ch{cfg = Cfg} = State) -> @@ -995,7 +986,7 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, tags = Tags}}}) -> case lists:member(impersonator, Tags) of true -> ok; - false -> precondition_failed( + false -> rabbit_misc:precondition_failed( "user_id property set to '~ts' but authenticated user was " "'~ts'", [Claimed, Actual]) end. @@ -1003,7 +994,7 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, check_expiration_header(Props) -> case rabbit_basic:parse_expiration(Props) of {ok, _} -> ok; - {error, E} -> precondition_failed("invalid expiration '~ts': ~tp", + {error, E} -> rabbit_misc:precondition_failed("invalid expiration '~ts': ~tp", [Props#'P_basic'.expiration, E]) end. @@ -1074,7 +1065,7 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) -> _ -> "message size ~B is larger than configured max size ~B" end, - precondition_failed(ErrorMessage, + rabbit_misc:precondition_failed(ErrorMessage, [Size, MaxMessageSize]); _ -> ok end. @@ -1082,7 +1073,7 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) -> check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> case rabbit_vhost_limit:is_over_queue_limit(VHost) of false -> ok; - {true, Limit} -> precondition_failed("cannot declare queue '~ts': " + {true, Limit} -> rabbit_misc:precondition_failed("cannot declare queue '~ts': " "queue limit in vhost '~ts' (~tp) is reached", [QueueName, VHost, Limit]) @@ -1704,7 +1695,7 @@ handle_method(#'queue.purge'{nowait = NoWait} = Method, end; handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> - precondition_failed("cannot switch from confirm to tx mode"); + rabbit_misc:precondition_failed("cannot switch from confirm to tx mode"); handle_method(#'tx.select'{}, _, State = #ch{tx = none}) -> {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}}; @@ -1713,7 +1704,7 @@ handle_method(#'tx.select'{}, _, State) -> {reply, #'tx.select_ok'{}, State}; handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> - precondition_failed("channel is not transactional"); + rabbit_misc:precondition_failed("channel is not transactional"); handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks}, limiter = Limiter}) -> @@ -1731,7 +1722,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks}, {noreply, maybe_complete_tx(State3#ch{tx = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> - precondition_failed("channel is not transactional"); + rabbit_misc:precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = {_Msgs, Acks}}) -> @@ -1741,7 +1732,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = new_tx()}}; handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) -> - precondition_failed("cannot switch from tx to confirm mode"); + rabbit_misc:precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1762,7 +1753,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag, {ok, {Q, _CParams}} -> {ok, QStates, Actions} = rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates0), {noreply, handle_queue_actions(Actions, State#ch{queue_states = QStates})}; - error -> precondition_failed( + error -> rabbit_misc:precondition_failed( "unknown consumer tag '~ts'", [CTag]) end; @@ -2050,7 +2041,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) -> UAMQTail, DeliveryTag, Multiple) end; {empty, _} -> - precondition_failed("unknown delivery tag ~w", [DeliveryTag]) + rabbit_misc:precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. %% Settles (acknowledges) messages at the queue replica process level. @@ -2540,7 +2531,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, undefined -> ok; {error, {invalid_type, Type}} -> - precondition_failed( + rabbit_misc:precondition_failed( "invalid type '~ts' for arg '~ts' in ~ts", [Type, DlxKey, rabbit_misc:rs(QueueName)]); DLX -> @@ -2605,35 +2596,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), check_configure_permitted(QueueName, User, AuthzContext), - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:check_exclusive_access(Q, ConnPid), - rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username) - end, - fun (not_found) -> - {ok, 0}; - ({absent, Q, crashed}) -> - _ = rabbit_classic_queue:delete_crashed(Q, Username), - {ok, 0}; - ({absent, Q, stopped}) -> - _ = rabbit_classic_queue:delete_crashed(Q, Username), - {ok, 0}; - ({absent, Q, Reason}) -> - rabbit_amqqueue:absent(Q, Reason) - end) of - {error, in_use} -> - precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]); - {error, not_empty} -> - precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]); - {error, {exit, _, _}} -> - %% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}} - {ok, 0}; - {ok, Count} -> - {ok, Count}; - {protocol_error, Type, Reason, ReasonArgs} -> - rabbit_misc:protocol_error(Type, Reason, ReasonArgs) - end; + rabbit_amqqueue:delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, true); handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, _ConnPid, AuthzContext, _CollectorPid, VHostPath, @@ -2647,7 +2610,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> ok; {error, in_use} -> - precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]); ok -> ok end; @@ -2689,7 +2652,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of undefined -> ok; {error, {invalid_type, Type}} -> - precondition_failed( + rabbit_misc:precondition_failed( "invalid type '~ts' for arg '~ts' in ~ts", [Type, AeKey, rabbit_misc:rs(ExchangeName)]); AName -> check_read_permitted(ExchangeName, User, AuthzContext), diff --git a/deps/rabbit/test/crashing_queues_SUITE.erl b/deps/rabbit/test/crashing_queues_SUITE.erl index ffd73f2fc89f..55fe8acd54b8 100644 --- a/deps/rabbit/test/crashing_queues_SUITE.erl +++ b/deps/rabbit/test/crashing_queues_SUITE.erl @@ -96,7 +96,8 @@ test_queue_failure(Node, Ch, RaceConn, MsgCount, FollowerCount, Decl) -> publish(Ch, QName, transient), publish(Ch, QName, durable), Racer = spawn_declare_racer(RaceConn, Decl), - kill_queue(Node, QName), + QRes = rabbit_misc:r(<<"/">>, queue, QName), + rabbit_amqqueue:kill_queue(Node, QRes), assert_message_count(MsgCount, Ch, QName), assert_follower_count(FollowerCount, Node, QName), stop_declare_racer(Racer) @@ -112,21 +113,22 @@ give_up_after_repeated_crashes(Config) -> amqp_channel:call(ChA, #'confirm.select'{}), amqp_channel:call(ChA, #'queue.declare'{queue = QName, durable = true}), - await_state(A, QName, running), + rabbit_amqqueue_control:await_state(A, QName, running), publish(ChA, QName, durable), - kill_queue_hard(A, QName), + QRes = rabbit_misc:r(<<"/">>, queue, QName), + rabbit_amqqueue:kill_queue_hard(A, QRes), {'EXIT', _} = (catch amqp_channel:call( ChA, #'queue.declare'{queue = QName, durable = true})), - await_state(A, QName, crashed), + rabbit_amqqueue_control:await_state(A, QName, crashed), amqp_channel:call(ChB, #'queue.delete'{queue = QName}), amqp_channel:call(ChB, #'queue.declare'{queue = QName, durable = true}), - await_state(A, QName, running), + rabbit_amqqueue_control:await_state(A, QName, running), %% Since it's convenient, also test absent queue status here. rabbit_ct_broker_helpers:stop_node(Config, B), - await_state(A, QName, down), + rabbit_amqqueue_control:await_state(A, QName, down), ok. @@ -172,79 +174,11 @@ declare_racer_loop(Parent, Conn, Decl) -> declare_racer_loop(Parent, Conn, Decl) end. -await_state(Node, QName, State) -> - await_state(Node, QName, State, 30000). - -await_state(Node, QName, State, Time) -> - case state(Node, QName) of - State -> - ok; - Other -> - case Time of - 0 -> exit({timeout_awaiting_state, State, Other}); - _ -> timer:sleep(100), - await_state(Node, QName, State, Time - 100) - end - end. - -state(Node, QName) -> - V = <<"/">>, - Res = rabbit_misc:r(V, queue, QName), - Infos = rpc:call(Node, rabbit_amqqueue, info_all, [V, [name, state]]), - case Infos of - [] -> undefined; - [[{name, Res}, {state, State}]] -> State - end. - -kill_queue_hard(Node, QName) -> - case kill_queue(Node, QName) of - crashed -> ok; - _NewPid -> timer:sleep(100), - kill_queue_hard(Node, QName) - end. - -kill_queue(Node, QName) -> - Pid1 = queue_pid(Node, QName), - exit(Pid1, boom), - await_new_pid(Node, QName, Pid1). - -queue_pid(Node, QName) -> - Q = lookup(Node, QName), - QPid = amqqueue:get_pid(Q), - State = amqqueue:get_state(Q), - #resource{virtual_host = VHost} = amqqueue:get_name(Q), - case State of - crashed -> - case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of - {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; - {ok, SPid} -> - case sup_child(Node, SPid) of - {ok, _} -> QPid; %% restarting - {error, no_child} -> crashed %% given up - end - end; - _ -> QPid - end. - -sup_child(Node, Sup) -> - case rpc:call(Node, supervisor, which_children, [Sup]) of - [{_, Child, _, _}] -> {ok, Child}; - [] -> {error, no_child}; - {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} - end. - lookup(Node, QName) -> {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, QName)]), Q. -await_new_pid(Node, QName, OldPid) -> - case queue_pid(Node, QName) of - OldPid -> timer:sleep(10), - await_new_pid(Node, QName, OldPid); - New -> New - end. - assert_message_count(Count, Ch, QName) -> #'queue.declare_ok'{message_count = Count} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index c9df60806426..08c288a66ba6 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -20,7 +20,8 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, - protocol_error/3, protocol_error/4, protocol_error/1]). + protocol_error/3, protocol_error/4, protocol_error/1, + precondition_failed/1, precondition_failed/2]). -export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]). -export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -87,6 +88,7 @@ maps_put_truthy/3, maps_put_falsy/3 ]). +-export([remote_sup_child/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -253,6 +255,8 @@ -spec group_proplists_by(fun((proplists:proplist()) -> any()), list(proplists:proplist())) -> list(list(proplists:proplist())). +-spec precondition_failed(string()) -> no_return(). +-spec precondition_failed(string(), [any()]) -> no_return(). %%---------------------------------------------------------------------------- @@ -286,6 +290,11 @@ protocol_error(Name, ExplanationFormat, Params, Method) -> protocol_error(#amqp_error{} = Error) -> exit(Error). +precondition_failed(Format) -> precondition_failed(Format, []). + +precondition_failed(Format, Params) -> + protocol_error(precondition_failed, Format, Params). + type_class(byte) -> int; type_class(short) -> int; type_class(signedint) -> int; @@ -1441,18 +1450,10 @@ safe_ets_update_element(Tab, Key, ElementSpec, OnSuccess, OnFailure) -> false end. -%% not exported by supervisor --type supervisor_child_id() :: term(). --type supervisor_sup_ref() :: (Name :: atom()) - | {Name :: atom(), Node :: node()} - | {'global', Name :: atom()} - | {'via', Module :: module(), Name :: any()} - | pid(). - %% this used to be in supervisor2 -spec find_child(Supervisor, Name) -> [pid()] when - Supervisor :: supervisor_sup_ref(), - Name :: supervisor_child_id(). + Supervisor :: rabbit_types:sup_ref(), + Name :: rabbit_types:child_id(). find_child(Supervisor, Name) -> [Pid || {Name1, Pid, _Type, _Modules} <- supervisor:which_children(Supervisor), Name1 =:= Name]. @@ -1631,3 +1632,11 @@ maps_put_falsy(K, false, M) -> maps:put(K, false, M); maps_put_falsy(_K, _V, M) -> M. + +-spec remote_sup_child(node(), rabbit_types:sup_ref()) -> rabbit_types:ok_or_error2(rabbit_types:child(), no_child | no_sup). +remote_sup_child(Node, Sup) -> + case rpc:call(Node, supervisor, which_children, [Sup]) of + [{_, Child, _, _}] -> {ok, Child}; + [] -> {error, no_child}; + {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} + end. diff --git a/deps/rabbit_common/src/rabbit_types.erl b/deps/rabbit_common/src/rabbit_types.erl index 40a7e82008fd..e58811c6c58d 100644 --- a/deps/rabbit_common/src/rabbit_types.erl +++ b/deps/rabbit_common/src/rabbit_types.erl @@ -30,7 +30,8 @@ node_type/0, topic_access_context/0, authz_data/0, authz_context/0, permission_atom/0, rabbit_amqqueue_name/0, binding_key/0, channel_number/0, - exchange_name/0, exchange_type/0, guid/0, routing_key/0]). + exchange_name/0, exchange_type/0, guid/0, routing_key/0, + sup_ref/0, child/0, child_id/0]). -type(option(T) :: T | 'none' | 'undefined'). %% Deprecated, 'maybe' is a keyword in modern Erlang @@ -216,3 +217,13 @@ -type(authz_context() :: map()). -type(permission_atom() :: 'configure' | 'write' | 'read'). + +%% not exported by OTP supervisor +-type sup_ref() :: (Name :: atom()) + | {Name :: atom(), Node :: node()} + | {'global', Name :: term()} + | {'via', Module :: module(), Name :: any()} + | pid(). + +-type child() :: 'undefined' | pid(). +-type child_id() :: term(). diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex index 9811bc2c56e2..3aabdbf760fb 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex @@ -57,7 +57,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do :rabbit_misc.rpc_call( node, :rabbit_amqqueue, - :delete, + :delete_with, [queue, if_unused, if_empty, "cli_user"], timeout ) diff --git a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs index cbf68c64d298..6e9036dd6501 100644 --- a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs @@ -76,6 +76,42 @@ defmodule DeleteQueueCommandTest do {:error, :not_found} = lookup_queue(q, @vhost) end + @tag test_timeout: 30000 + test "run: request to an existing crashed queue on active node succeeds", context do + add_vhost(@vhost) + set_permissions(@user, @vhost, [".*", ".*", ".*"]) + on_exit(context, fn -> delete_vhost(@vhost) end) + + q = "foo" + n = 20 + + declare_queue(q, @vhost, true) + publish_messages(@vhost, q, n) + q_resource = :rabbit_misc.r(@vhost, :queue, q) + crash_queue(q_resource) + + assert @command.run([q], context[:opts]) == {:ok, 0} + {:error, :not_found} = lookup_queue(q, @vhost) + end + + @tag test_timeout: 30000 + test "run: request to an existing stopped queue on active node succeeds", context do + add_vhost(@vhost) + set_permissions(@user, @vhost, [".*", ".*", ".*"]) + on_exit(context, fn -> delete_vhost(@vhost) end) + + q = "bar" + n = 20 + + declare_queue(q, @vhost, true) + publish_messages(@vhost, q, n) + q_resource = :rabbit_misc.r(@vhost, :queue, q) + stop_queue(q_resource) + + assert @command.run([q], context[:opts]) == {:ok, 0} + {:error, :not_found} = lookup_queue(q, @vhost) + end + @tag test_timeout: 30000 test "run: request to a non-existent queue on active node returns not found", context do assert @command.run(["non-existent"], context[:opts]) == {:error, :not_found} diff --git a/deps/rabbitmq_cli/test/rabbitmqctl_test.exs b/deps/rabbitmq_cli/test/rabbitmqctl_test.exs index e2468aa1a272..411fc154da8b 100644 --- a/deps/rabbitmq_cli/test/rabbitmqctl_test.exs +++ b/deps/rabbitmq_cli/test/rabbitmqctl_test.exs @@ -168,7 +168,7 @@ defmodule RabbitMQCtlTest do capture_io(:stderr, fn -> error_check(command, exit_dataerr()) end) end - test "a mcommand with an unsupported option as the first command-line arg fails gracefully" do + test "a command with an unsupported option as the first command-line arg fails gracefully" do command1 = ["--invalid=true", "list_permissions", "-p", "/"] assert capture_io(:stderr, fn -> diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index fe4ecf150c13..7573bd81b9aa 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -567,6 +567,35 @@ defmodule TestHelper do assert {:error, :econnrefused} == AMQP.Connection.open(virtual_host: vhost) end + def crash_queue(queue_resource = {:resource, vhost, :queue, queue_name}) do + node = get_rabbit_hostname() + + :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [node, queue_resource]) + + :ok = + :rabbit_misc.rpc_call(node, :rabbit_amqqueue_control, :await_state, [ + node, + queue_resource, + :crashed + ]) + + {:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true) + :crashed = :amqqueue.get_state(existing_amqqueue) + end + + def stop_queue(queue_resource = {:resource, vhost, :queue, queue_name}) do + node = get_rabbit_hostname() + + :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [ + node, + queue_resource, + :shutdown + ]) + + {:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true) + :stopped = :amqqueue.get_state(existing_amqqueue) + end + def delete_all_queues() do try do immediately_delete_all_queues(:rabbit_amqqueue.list()) diff --git a/moduleindex.yaml b/moduleindex.yaml index 31e7a75ffa90..52c85ffcfd89 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -584,6 +584,7 @@ rabbit: - rabbit_access_control - rabbit_alarm - rabbit_amqqueue +- rabbit_amqqueue_control - rabbit_amqqueue_process - rabbit_amqqueue_sup - rabbit_amqqueue_sup_sup