From d899459b6874b24c1030e244a5cb8f4bf02a0c0a Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 13:06:43 +0100 Subject: [PATCH 01/17] implement rabbit_amqqueue:delete_with/{4,6} api --- deps/rabbit/src/rabbit_amqqueue.erl | 47 +++++++++++++++++++++++++- deps/rabbit_common/src/rabbit_misc.erl | 10 +++++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 6ffff0edae19..8e1c08d219eb 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -53,7 +53,7 @@ -export([delete_crashed/1, delete_crashed/2, delete_crashed_internal/2]). - +-export([delete_with/4, delete_with/6]). -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). @@ -1601,6 +1601,51 @@ delete_immediately_by_resource(Resources) -> delete(Q, IfUnused, IfEmpty, ActingUser) -> rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser). +-spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) -> + rabbit_types:ok(integer()) | rabbit_channel:channel_or_connection_exit(). +delete_with(QueueName, IfUnused, IfEmpty, ActingUser) -> + delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false). + +-spec delete_with(amqqueue:amqqueue() | name(), pid(), boolean(), boolean(), rabbit_types:username(), boolean()) -> + rabbit_types:ok(integer()) | rabbit_channel:channel_or_connection_exit(). +delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) -> + QueueName = amqqueue:get_name(AMQQueue), + delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive); +delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when is_record(QueueName, resource) -> + case with( + QueueName, + fun (Q) -> + if CheckExclusive -> + check_exclusive_access(Q, ConnPid); + true -> + ok + end, + rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username) + end, + fun (not_found) -> + {ok, 0}; + ({absent, Q, crashed}) -> + _ = rabbit_classic_queue:delete_crashed(Q, Username), + {ok, 0}; + ({absent, Q, stopped}) -> + _ = rabbit_classic_queue:delete_crashed(Q, Username), + {ok, 0}; + ({absent, Q, Reason}) -> + absent(Q, Reason) + end) of + {error, in_use} -> + rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]); + {error, not_empty} -> + rabbit_misc:precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]); + {error, {exit, _, _}} -> + %% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}} + {ok, 0}; + {ok, Count} -> + {ok, Count}; + {protocol_error, Type, Reason, ReasonArgs} -> + rabbit_misc:protocol_error(Type, Reason, ReasonArgs) + end. + %% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS delete_crashed(Q) when ?amqqueue_is_classic(Q) -> rabbit_classic_queue:delete_crashed(Q). diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index c9df60806426..436150e0ea6a 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -20,7 +20,8 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, - protocol_error/3, protocol_error/4, protocol_error/1]). + protocol_error/3, protocol_error/4, protocol_error/1, + precondition_failed/1, precondition_failed/2]). -export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]). -export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -253,6 +254,8 @@ -spec group_proplists_by(fun((proplists:proplist()) -> any()), list(proplists:proplist())) -> list(list(proplists:proplist())). +-spec precondition_failed(string()) -> no_return(). +-spec precondition_failed(string(), [any()]) -> no_return(). %%---------------------------------------------------------------------------- @@ -286,6 +289,11 @@ protocol_error(Name, ExplanationFormat, Params, Method) -> protocol_error(#amqp_error{} = Error) -> exit(Error). +precondition_failed(Format) -> precondition_failed(Format, []). + +precondition_failed(Format, Params) -> + protocol_error(precondition_failed, Format, Params). + type_class(byte) -> int; type_class(short) -> int; type_class(signedint) -> int; From 63165c01cc8f56c7072097dd9a0eb0515e265470 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 13:09:40 +0100 Subject: [PATCH 02/17] update channel to use delete_with on queue.delete handling --- deps/rabbit/src/rabbit_channel.erl | 65 +++++++----------------------- 1 file changed, 14 insertions(+), 51 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index fa97da627527..fa85e1d2268a 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -930,15 +930,6 @@ handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, {stop, normal, State1} end. --spec precondition_failed(string()) -> no_return(). - -precondition_failed(Format) -> precondition_failed(Format, []). - --spec precondition_failed(string(), [any()]) -> no_return(). - -precondition_failed(Format, Params) -> - rabbit_misc:protocol_error(precondition_failed, Format, Params). - return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, #ch{cfg = Cfg} = State) -> @@ -995,7 +986,7 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, tags = Tags}}}) -> case lists:member(impersonator, Tags) of true -> ok; - false -> precondition_failed( + false -> rabbit_misc:precondition_failed( "user_id property set to '~ts' but authenticated user was " "'~ts'", [Claimed, Actual]) end. @@ -1003,7 +994,7 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, check_expiration_header(Props) -> case rabbit_basic:parse_expiration(Props) of {ok, _} -> ok; - {error, E} -> precondition_failed("invalid expiration '~ts': ~tp", + {error, E} -> rabbit_misc:precondition_failed("invalid expiration '~ts': ~tp", [Props#'P_basic'.expiration, E]) end. @@ -1074,7 +1065,7 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) -> _ -> "message size ~B is larger than configured max size ~B" end, - precondition_failed(ErrorMessage, + rabbit_misc:precondition_failed(ErrorMessage, [Size, MaxMessageSize]); _ -> ok end. @@ -1082,7 +1073,7 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) -> check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> case rabbit_vhost_limit:is_over_queue_limit(VHost) of false -> ok; - {true, Limit} -> precondition_failed("cannot declare queue '~ts': " + {true, Limit} -> rabbit_misc:precondition_failed("cannot declare queue '~ts': " "queue limit in vhost '~ts' (~tp) is reached", [QueueName, VHost, Limit]) @@ -1704,7 +1695,7 @@ handle_method(#'queue.purge'{nowait = NoWait} = Method, end; handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> - precondition_failed("cannot switch from confirm to tx mode"); + rabbit_misc:precondition_failed("cannot switch from confirm to tx mode"); handle_method(#'tx.select'{}, _, State = #ch{tx = none}) -> {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}}; @@ -1713,7 +1704,7 @@ handle_method(#'tx.select'{}, _, State) -> {reply, #'tx.select_ok'{}, State}; handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> - precondition_failed("channel is not transactional"); + rabbit_misc:precondition_failed("channel is not transactional"); handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks}, limiter = Limiter}) -> @@ -1731,7 +1722,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks}, {noreply, maybe_complete_tx(State3#ch{tx = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> - precondition_failed("channel is not transactional"); + rabbit_misc:precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = {_Msgs, Acks}}) -> @@ -1741,7 +1732,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = new_tx()}}; handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) -> - precondition_failed("cannot switch from tx to confirm mode"); + rabbit_misc:precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1762,7 +1753,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag, {ok, {Q, _CParams}} -> {ok, QStates, Actions} = rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates0), {noreply, handle_queue_actions(Actions, State#ch{queue_states = QStates})}; - error -> precondition_failed( + error -> rabbit_misc:precondition_failed( "unknown consumer tag '~ts'", [CTag]) end; @@ -2050,7 +2041,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) -> UAMQTail, DeliveryTag, Multiple) end; {empty, _} -> - precondition_failed("unknown delivery tag ~w", [DeliveryTag]) + rabbit_misc:precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. %% Settles (acknowledges) messages at the queue replica process level. @@ -2540,7 +2531,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, undefined -> ok; {error, {invalid_type, Type}} -> - precondition_failed( + rabbit_misc:precondition_failed( "invalid type '~ts' for arg '~ts' in ~ts", [Type, DlxKey, rabbit_misc:rs(QueueName)]); DLX -> @@ -2605,35 +2596,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), check_configure_permitted(QueueName, User, AuthzContext), - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:check_exclusive_access(Q, ConnPid), - rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username) - end, - fun (not_found) -> - {ok, 0}; - ({absent, Q, crashed}) -> - _ = rabbit_classic_queue:delete_crashed(Q, Username), - {ok, 0}; - ({absent, Q, stopped}) -> - _ = rabbit_classic_queue:delete_crashed(Q, Username), - {ok, 0}; - ({absent, Q, Reason}) -> - rabbit_amqqueue:absent(Q, Reason) - end) of - {error, in_use} -> - precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]); - {error, not_empty} -> - precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]); - {error, {exit, _, _}} -> - %% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}} - {ok, 0}; - {ok, Count} -> - {ok, Count}; - {protocol_error, Type, Reason, ReasonArgs} -> - rabbit_misc:protocol_error(Type, Reason, ReasonArgs) - end; + rabbit_amqqueue:delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, true); handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, _ConnPid, AuthzContext, _CollectorPid, VHostPath, @@ -2647,7 +2610,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> ok; {error, in_use} -> - precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]); ok -> ok end; @@ -2689,7 +2652,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of undefined -> ok; {error, {invalid_type, Type}} -> - precondition_failed( + rabbit_misc:precondition_failed( "invalid type '~ts' for arg '~ts' in ~ts", [Type, AeKey, rabbit_misc:rs(ExchangeName)]); AName -> check_read_permitted(ExchangeName, User, AuthzContext), From 84e924b8542eb610fd901c84f28f47110253f801 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 13:11:28 +0100 Subject: [PATCH 03/17] update ctl delete_queue command to use delete_with api --- .../lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex index 9811bc2c56e2..3aabdbf760fb 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex @@ -57,7 +57,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do :rabbit_misc.rpc_call( node, :rabbit_amqqueue, - :delete, + :delete_with, [queue, if_unused, if_empty, "cli_user"], timeout ) From f0a29c95a5354fdf25c463011f59bc1727e7c074 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 14:09:18 +0100 Subject: [PATCH 04/17] make kill_queue/{2,3} and kill_queue_hard/{2,3} from crashing_queues_SUITE reusable --- deps/rabbit/src/rabbit_amqqueue.erl | 52 ++++++++++++ deps/rabbit/test/crashing_queues_SUITE.erl | 80 ++----------------- .../rabbit_common/src/rabbit_control_misc.erl | 38 +++++++++ 3 files changed, 96 insertions(+), 74 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 8e1c08d219eb..59c02cba01d1 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -55,6 +55,7 @@ delete_crashed_internal/2]). -export([delete_with/4, delete_with/6]). -export([pid_of/1, pid_of/2]). +-export([pid_or_crashed/3]). -export([mark_local_durable_queues_stopped/1]). -export([rebalance/3]). @@ -71,6 +72,8 @@ -export([prepend_extra_bcc/1]). -export([queue/1, queue_names/1]). +-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -2106,3 +2109,52 @@ is_queue_args_combination_permitted(Durable, Exclusive) -> true -> rabbit_deprecated_features:is_permitted(transient_nonexcl_queues) end. + +-spec kill_queue_hard(rabbit_types:vhost(), rabbit_amqqueue:name()) -> ok. +kill_queue_hard(Node, QName) -> + kill_queue_hard(Node, <<"/">>, QName). + +-spec kill_queue_hard(node(), rabbit_types:vhost(), rabbit_amqqueue:name()) -> ok. +kill_queue_hard(Node, VHost, QName) -> + case kill_queue(Node, VHost, QName) of + crashed -> ok; + _NewPid -> timer:sleep(100), + kill_queue_hard(Node, VHost, QName) + end. + +-spec kill_queue(rabbit_types:vhost(), name()) -> ok. +kill_queue(Node, QName) -> + kill_queue(Node, <<"/">>, QName). + +-spec kill_queue(node(), rabbit_types:vhost(), name()) -> ok. +kill_queue(Node, VHost, QName) -> + Pid1 = pid_or_crashed(Node, VHost, QName), + exit(Pid1, boom), + rabbit_control_misc:await_new_pid(Node, VHost, QName, Pid1). + +-spec pid_or_crashed(node(), rabbit_types:vhost(), name()) -> pid() | crashed. +pid_or_crashed(Node, VHost, QName) -> + QResource = rabbit_misc:r(VHost, queue, QName), + {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [QResource]), + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + #resource{virtual_host = VHost} = amqqueue:get_name(Q), + case State of + crashed -> + case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of + {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; + {ok, SPid} -> + case sup_child(Node, SPid) of + {ok, _} -> QPid; %% restarting + {error, no_child} -> crashed %% given up + end + end; + _ -> QPid + end. + +sup_child(Node, Sup) -> + case rpc:call(Node, supervisor, which_children, [Sup]) of + [{_, Child, _, _}] -> {ok, Child}; + [] -> {error, no_child}; + {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} + end. diff --git a/deps/rabbit/test/crashing_queues_SUITE.erl b/deps/rabbit/test/crashing_queues_SUITE.erl index ffd73f2fc89f..b1129f0cef7a 100644 --- a/deps/rabbit/test/crashing_queues_SUITE.erl +++ b/deps/rabbit/test/crashing_queues_SUITE.erl @@ -96,7 +96,7 @@ test_queue_failure(Node, Ch, RaceConn, MsgCount, FollowerCount, Decl) -> publish(Ch, QName, transient), publish(Ch, QName, durable), Racer = spawn_declare_racer(RaceConn, Decl), - kill_queue(Node, QName), + rabbit_amqqueue:kill_queue(Node, QName), assert_message_count(MsgCount, Ch, QName), assert_follower_count(FollowerCount, Node, QName), stop_declare_racer(Racer) @@ -112,21 +112,21 @@ give_up_after_repeated_crashes(Config) -> amqp_channel:call(ChA, #'confirm.select'{}), amqp_channel:call(ChA, #'queue.declare'{queue = QName, durable = true}), - await_state(A, QName, running), + rabbit_control_misc:await_state(A, QName, running), publish(ChA, QName, durable), - kill_queue_hard(A, QName), + rabbit_amqqueue:kill_queue_hard(A, QName), {'EXIT', _} = (catch amqp_channel:call( ChA, #'queue.declare'{queue = QName, durable = true})), - await_state(A, QName, crashed), + rabbit_control_misc:await_state(A, QName, crashed), amqp_channel:call(ChB, #'queue.delete'{queue = QName}), amqp_channel:call(ChB, #'queue.declare'{queue = QName, durable = true}), - await_state(A, QName, running), + rabbit_control_misc:await_state(A, QName, running), %% Since it's convenient, also test absent queue status here. rabbit_ct_broker_helpers:stop_node(Config, B), - await_state(A, QName, down), + rabbit_control_misc:await_state(A, QName, down), ok. @@ -172,79 +172,11 @@ declare_racer_loop(Parent, Conn, Decl) -> declare_racer_loop(Parent, Conn, Decl) end. -await_state(Node, QName, State) -> - await_state(Node, QName, State, 30000). - -await_state(Node, QName, State, Time) -> - case state(Node, QName) of - State -> - ok; - Other -> - case Time of - 0 -> exit({timeout_awaiting_state, State, Other}); - _ -> timer:sleep(100), - await_state(Node, QName, State, Time - 100) - end - end. - -state(Node, QName) -> - V = <<"/">>, - Res = rabbit_misc:r(V, queue, QName), - Infos = rpc:call(Node, rabbit_amqqueue, info_all, [V, [name, state]]), - case Infos of - [] -> undefined; - [[{name, Res}, {state, State}]] -> State - end. - -kill_queue_hard(Node, QName) -> - case kill_queue(Node, QName) of - crashed -> ok; - _NewPid -> timer:sleep(100), - kill_queue_hard(Node, QName) - end. - -kill_queue(Node, QName) -> - Pid1 = queue_pid(Node, QName), - exit(Pid1, boom), - await_new_pid(Node, QName, Pid1). - -queue_pid(Node, QName) -> - Q = lookup(Node, QName), - QPid = amqqueue:get_pid(Q), - State = amqqueue:get_state(Q), - #resource{virtual_host = VHost} = amqqueue:get_name(Q), - case State of - crashed -> - case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of - {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; - {ok, SPid} -> - case sup_child(Node, SPid) of - {ok, _} -> QPid; %% restarting - {error, no_child} -> crashed %% given up - end - end; - _ -> QPid - end. - -sup_child(Node, Sup) -> - case rpc:call(Node, supervisor, which_children, [Sup]) of - [{_, Child, _, _}] -> {ok, Child}; - [] -> {error, no_child}; - {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} - end. - lookup(Node, QName) -> {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, QName)]), Q. -await_new_pid(Node, QName, OldPid) -> - case queue_pid(Node, QName) of - OldPid -> timer:sleep(10), - await_new_pid(Node, QName, OldPid); - New -> New - end. - assert_message_count(Count, Ch, QName) -> #'queue.declare_ok'{message_count = Count} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl index 6b7b09c64deb..9f39bd701fdf 100644 --- a/deps/rabbit_common/src/rabbit_control_misc.erl +++ b/deps/rabbit_common/src/rabbit_control_misc.erl @@ -10,6 +10,7 @@ -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4, emitting_map_with_exit_handler/5, wait_for_info_messages/6, spawn_emitter_caller/7, await_emitters_termination/1, + await_new_pid/4, await_state/3, await_state/4, await_state/5, print_cmd_result/2]). -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'. @@ -25,6 +26,10 @@ InitialAcc :: term(), Acc :: term(), OK :: {ok, Acc}, Err :: {error, term()}. -spec spawn_emitter_caller(node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'. -spec await_emitters_termination([pid()]) -> 'ok'. +-spec await_new_pid(node(), rabbit_types:vhost(), rabbit_amqqueue:name(), pid()) -> pid(). +-spec await_state(node(), rabbit_amqqueue:name(), atom()) -> 'ok'. +-spec await_state(node(), rabbit_types:vhost(), rabbit_amqqueue:name(), atom()) -> 'ok'. +-spec await_state(node(), rabbit_types:vhost(), rabbit_amqqueue:name(), atom(), integer()) -> 'ok'. -spec print_cmd_result(atom(), term()) -> 'ok'. @@ -177,3 +182,36 @@ notify_if_timeout(Pid, Ref, Timeout) -> print_cmd_result(authenticate_user, _Result) -> io:format("Success~n"); print_cmd_result(join_cluster, already_member) -> io:format("The node is already a member of this cluster~n"). + +await_new_pid(Node, VHost, QName, OldPid) -> + case rabbit_amqqueue:pid_or_crashed(Node, VHost, QName) of + OldPid -> timer:sleep(10), + await_new_pid(Node, VHost, QName, OldPid); + New -> New + end. + +await_state(Node, QName, State) -> + await_state(Node, <<"/">>, QName, State). + +await_state(Node, VHost, QName, State) -> + await_state(Node, VHost, QName, State, 30000). + +await_state(Node, VHost, QName, State, Time) -> + case state(Node, VHost, QName) of + State -> + ok; + Other -> + case Time of + 0 -> exit({timeout_awaiting_state, State, Other}); + _ -> timer:sleep(100), + await_state(Node, VHost, QName, State, Time - 100) + end + end. + +state(Node, VHost, QName) -> + Res = rabbit_misc:r(VHost, queue, QName), + Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]), + case Infos of + [] -> undefined; + [[{name, Res}, {state, State}]] -> State + end. \ No newline at end of file From dd900b4c980835dfe47947112918c68752c38f61 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 14:13:15 +0100 Subject: [PATCH 05/17] fix unrelated typo --- deps/rabbitmq_cli/test/rabbitmqctl_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_cli/test/rabbitmqctl_test.exs b/deps/rabbitmq_cli/test/rabbitmqctl_test.exs index e2468aa1a272..411fc154da8b 100644 --- a/deps/rabbitmq_cli/test/rabbitmqctl_test.exs +++ b/deps/rabbitmq_cli/test/rabbitmqctl_test.exs @@ -168,7 +168,7 @@ defmodule RabbitMQCtlTest do capture_io(:stderr, fn -> error_check(command, exit_dataerr()) end) end - test "a mcommand with an unsupported option as the first command-line arg fails gracefully" do + test "a command with an unsupported option as the first command-line arg fails gracefully" do command1 = ["--invalid=true", "list_permissions", "-p", "/"] assert capture_io(:stderr, fn -> From 3502929b9fcb2e0ae2baf7c227b4f9b10f3c5c05 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 14:15:03 +0100 Subject: [PATCH 06/17] ctl delete_queue command test case for a crashed queue --- .../test/ctl/delete_queue_command_test.exs | 18 ++++++++++++++++++ deps/rabbitmq_cli/test/test_helper.exs | 11 +++++++++++ 2 files changed, 29 insertions(+) diff --git a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs index cbf68c64d298..6a8a5888ffbe 100644 --- a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs @@ -76,6 +76,24 @@ defmodule DeleteQueueCommandTest do {:error, :not_found} = lookup_queue(q, @vhost) end + @tag test_timeout: 30000 + test "run: request to an existing crashed queue on active node succeeds", context do + add_vhost(@vhost) + set_permissions(@user, @vhost, [".*", ".*", ".*"]) + on_exit(context, fn -> delete_vhost(@vhost) end) + + q = "foo" + n = 20 + + declare_queue(q, @vhost, true) + publish_messages(@vhost, q, n) + q_resource = :rabbit_misc.r(@vhost, :queue, q) + crash_queue(q_resource) + + assert @command.run([q], context[:opts]) == {:ok, 0} + {:error, :not_found} = lookup_queue(q, @vhost) + end + @tag test_timeout: 30000 test "run: request to a non-existent queue on active node returns not found", context do assert @command.run(["non-existent"], context[:opts]) == {:error, :not_found} diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index fe4ecf150c13..d7e03b5d7890 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -567,6 +567,17 @@ defmodule TestHelper do assert {:error, :econnrefused} == AMQP.Connection.open(virtual_host: vhost) end + def crash_queue(queue_resource = {:resource, vhost, :queue, queue_name}) do + node = get_rabbit_hostname() + + :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [node, vhost, queue_name]) + + :ok = :rabbit_misc.rpc_call(node, :rabbit_control_misc, :await_state, [node, vhost, queue_name, :crashed]) + + {:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true) + :crashed = :amqqueue.get_state(existing_amqqueue) + end + def delete_all_queues() do try do immediately_delete_all_queues(:rabbit_amqqueue.list()) From 5717bcd09d1ace4df24153412df1750269c4ebbc Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Wed, 6 Sep 2023 16:43:25 +0100 Subject: [PATCH 07/17] refactor and move remote_sup_child/2 to rabbit_misc --- deps/rabbit/src/rabbit_amqqueue.erl | 9 +-------- deps/rabbit_common/src/rabbit_misc.erl | 9 +++++++++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 59c02cba01d1..03cd4f6e2352 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -2144,17 +2144,10 @@ pid_or_crashed(Node, VHost, QName) -> case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; {ok, SPid} -> - case sup_child(Node, SPid) of + case rabbit_misc:remote_sup_child(Node, SPid) of {ok, _} -> QPid; %% restarting {error, no_child} -> crashed %% given up end end; _ -> QPid end. - -sup_child(Node, Sup) -> - case rpc:call(Node, supervisor, which_children, [Sup]) of - [{_, Child, _, _}] -> {ok, Child}; - [] -> {error, no_child}; - {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} - end. diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 436150e0ea6a..a1908c507742 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -88,6 +88,7 @@ maps_put_truthy/3, maps_put_falsy/3 ]). +-export([remote_sup_child/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -1639,3 +1640,11 @@ maps_put_falsy(K, false, M) -> maps:put(K, false, M); maps_put_falsy(_K, _V, M) -> M. + +-spec remote_sup_child(node(), supervisor:sup_ref()) -> rabbit_types:ok_or_error2(supervisor:child(), no_child | no_sup). +remote_sup_child(Node, Sup) -> + case rpc:call(Node, supervisor, which_children, [Sup]) of + [{_, Child, _, _}] -> {ok, Child}; + [] -> {error, no_child}; + {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} + end. From 3ac283d13ea7e4f0c6323e16db053aade3f79e59 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 10:17:40 +0100 Subject: [PATCH 08/17] use queue resource names for new queue operations (changes the api specs) --- deps/rabbit/src/rabbit_amqqueue.erl | 40 ++++++++----------- deps/rabbit/test/crashing_queues_SUITE.erl | 6 ++- .../rabbit_common/src/rabbit_control_misc.erl | 40 ++++++++++--------- deps/rabbitmq_cli/test/test_helper.exs | 4 +- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 03cd4f6e2352..56796f50053d 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -55,7 +55,7 @@ delete_crashed_internal/2]). -export([delete_with/4, delete_with/6]). -export([pid_of/1, pid_of/2]). --export([pid_or_crashed/3]). +-export([pid_or_crashed/2]). -export([mark_local_durable_queues_stopped/1]). -export([rebalance/3]). @@ -72,7 +72,7 @@ -export([prepend_extra_bcc/1]). -export([queue/1, queue_names/1]). --export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]). +-export([kill_queue/2, kill_queue/3, kill_queue_hard/2]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -2110,35 +2110,29 @@ is_queue_args_combination_permitted(Durable, Exclusive) -> rabbit_deprecated_features:is_permitted(transient_nonexcl_queues) end. --spec kill_queue_hard(rabbit_types:vhost(), rabbit_amqqueue:name()) -> ok. -kill_queue_hard(Node, QName) -> - kill_queue_hard(Node, <<"/">>, QName). - --spec kill_queue_hard(node(), rabbit_types:vhost(), rabbit_amqqueue:name()) -> ok. -kill_queue_hard(Node, VHost, QName) -> - case kill_queue(Node, VHost, QName) of +-spec kill_queue_hard(node(), name()) -> ok. +kill_queue_hard(Node, QRes = #resource{kind = queue}) -> + case kill_queue(Node, QRes) of crashed -> ok; _NewPid -> timer:sleep(100), - kill_queue_hard(Node, VHost, QName) + kill_queue_hard(Node, QRes) end. --spec kill_queue(rabbit_types:vhost(), name()) -> ok. -kill_queue(Node, QName) -> - kill_queue(Node, <<"/">>, QName). +-spec kill_queue(node(), name()) -> ok. +kill_queue(Node, QRes) -> + kill_queue(Node, QRes, boom). --spec kill_queue(node(), rabbit_types:vhost(), name()) -> ok. -kill_queue(Node, VHost, QName) -> - Pid1 = pid_or_crashed(Node, VHost, QName), - exit(Pid1, boom), - rabbit_control_misc:await_new_pid(Node, VHost, QName, Pid1). +-spec kill_queue(node(), name(), atom()) -> ok. +kill_queue(Node, QRes = #resource{kind = queue}, Reason) -> + Pid1 = pid_or_crashed(Node, QRes), + exit(Pid1, Reason), + rabbit_control_misc:await_new_pid(Node, QRes, Pid1). --spec pid_or_crashed(node(), rabbit_types:vhost(), name()) -> pid() | crashed. -pid_or_crashed(Node, VHost, QName) -> - QResource = rabbit_misc:r(VHost, queue, QName), - {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [QResource]), +-spec pid_or_crashed(node(), name()) -> pid() | crashed. +pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) -> + {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [QRes]), QPid = amqqueue:get_pid(Q), State = amqqueue:get_state(Q), - #resource{virtual_host = VHost} = amqqueue:get_name(Q), case State of crashed -> case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of diff --git a/deps/rabbit/test/crashing_queues_SUITE.erl b/deps/rabbit/test/crashing_queues_SUITE.erl index b1129f0cef7a..4bf4bf339735 100644 --- a/deps/rabbit/test/crashing_queues_SUITE.erl +++ b/deps/rabbit/test/crashing_queues_SUITE.erl @@ -96,7 +96,8 @@ test_queue_failure(Node, Ch, RaceConn, MsgCount, FollowerCount, Decl) -> publish(Ch, QName, transient), publish(Ch, QName, durable), Racer = spawn_declare_racer(RaceConn, Decl), - rabbit_amqqueue:kill_queue(Node, QName), + QRes = rabbit_misc:r(<<"/">>, queue, QName), + rabbit_amqqueue:kill_queue(Node, QRes), assert_message_count(MsgCount, Ch, QName), assert_follower_count(FollowerCount, Node, QName), stop_declare_racer(Racer) @@ -114,7 +115,8 @@ give_up_after_repeated_crashes(Config) -> durable = true}), rabbit_control_misc:await_state(A, QName, running), publish(ChA, QName, durable), - rabbit_amqqueue:kill_queue_hard(A, QName), + QRes = rabbit_misc:r(<<"/">>, queue, QName), + rabbit_amqqueue:kill_queue_hard(A, QRes), {'EXIT', _} = (catch amqp_channel:call( ChA, #'queue.declare'{queue = QName, durable = true})), diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl index 9f39bd701fdf..ac71f7505feb 100644 --- a/deps/rabbit_common/src/rabbit_control_misc.erl +++ b/deps/rabbit_common/src/rabbit_control_misc.erl @@ -7,10 +7,12 @@ -module(rabbit_control_misc). +-include_lib("rabbit_common/include/resource.hrl"). + -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4, emitting_map_with_exit_handler/5, wait_for_info_messages/6, spawn_emitter_caller/7, await_emitters_termination/1, - await_new_pid/4, await_state/3, await_state/4, await_state/5, + await_new_pid/3, await_state/3, await_state/4, print_cmd_result/2]). -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'. @@ -26,13 +28,14 @@ InitialAcc :: term(), Acc :: term(), OK :: {ok, Acc}, Err :: {error, term()}. -spec spawn_emitter_caller(node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'. -spec await_emitters_termination([pid()]) -> 'ok'. --spec await_new_pid(node(), rabbit_types:vhost(), rabbit_amqqueue:name(), pid()) -> pid(). --spec await_state(node(), rabbit_amqqueue:name(), atom()) -> 'ok'. --spec await_state(node(), rabbit_types:vhost(), rabbit_amqqueue:name(), atom()) -> 'ok'. --spec await_state(node(), rabbit_types:vhost(), rabbit_amqqueue:name(), atom(), integer()) -> 'ok'. +-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid(). +-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'. +-spec await_state(node(), rabbit_amqqueue:name(), atom(), integer()) -> 'ok'. -spec print_cmd_result(atom(), term()) -> 'ok'. +-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000). + emitting_map(AggregatorPid, Ref, Fun, List) -> emitting_map(AggregatorPid, Ref, Fun, List, continue), AggregatorPid ! {Ref, finished}, @@ -183,35 +186,34 @@ notify_if_timeout(Pid, Ref, Timeout) -> print_cmd_result(authenticate_user, _Result) -> io:format("Success~n"); print_cmd_result(join_cluster, already_member) -> io:format("The node is already a member of this cluster~n"). -await_new_pid(Node, VHost, QName, OldPid) -> - case rabbit_amqqueue:pid_or_crashed(Node, VHost, QName) of +await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) -> + case rabbit_amqqueue:pid_or_crashed(Node, QRes) of OldPid -> timer:sleep(10), - await_new_pid(Node, VHost, QName, OldPid); + await_new_pid(Node, QRes, OldPid); New -> New end. -await_state(Node, QName, State) -> - await_state(Node, <<"/">>, QName, State). - -await_state(Node, VHost, QName, State) -> - await_state(Node, VHost, QName, State, 30000). +await_state(Node, QName, State) when is_binary(QName) -> + QRes = rabbit_misc:r(<<"/">>, queue, QName), + await_state(Node, QRes, State); +await_state(Node, QRes = #resource{kind = queue}, State) -> + await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT). -await_state(Node, VHost, QName, State, Time) -> - case state(Node, VHost, QName) of +await_state(Node, QRes = #resource{kind = queue}, State, Time) -> + case state(Node, QRes) of State -> ok; Other -> case Time of 0 -> exit({timeout_awaiting_state, State, Other}); _ -> timer:sleep(100), - await_state(Node, VHost, QName, State, Time - 100) + await_state(Node, QRes, State, Time - 100) end end. -state(Node, VHost, QName) -> - Res = rabbit_misc:r(VHost, queue, QName), +state(Node, QRes = #resource{virtual_host = VHost}) -> Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]), case Infos of [] -> undefined; - [[{name, Res}, {state, State}]] -> State + [[{name, QRes}, {state, State}]] -> State end. \ No newline at end of file diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index d7e03b5d7890..cef220fb1a9c 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -570,9 +570,9 @@ defmodule TestHelper do def crash_queue(queue_resource = {:resource, vhost, :queue, queue_name}) do node = get_rabbit_hostname() - :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [node, vhost, queue_name]) + :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [node, queue_resource]) - :ok = :rabbit_misc.rpc_call(node, :rabbit_control_misc, :await_state, [node, vhost, queue_name, :crashed]) + :ok = :rabbit_misc.rpc_call(node, :rabbit_control_misc, :await_state, [node, queue_resource, :crashed]) {:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true) :crashed = :amqqueue.get_state(existing_amqqueue) From 7daaee8da05506cec46cbede972d4b1d5359b03e Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 10:19:32 +0100 Subject: [PATCH 09/17] formatting --- deps/rabbit_common/src/rabbit_control_misc.erl | 2 +- deps/rabbitmq_cli/lib/rabbitmqctl.ex | 2 +- deps/rabbitmq_cli/test/test_helper.exs | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl index ac71f7505feb..4cbc2b3a9eea 100644 --- a/deps/rabbit_common/src/rabbit_control_misc.erl +++ b/deps/rabbit_common/src/rabbit_control_misc.erl @@ -216,4 +216,4 @@ state(Node, QRes = #resource{virtual_host = VHost}) -> case Infos of [] -> undefined; [[{name, QRes}, {state, State}]] -> State - end. \ No newline at end of file + end. diff --git a/deps/rabbitmq_cli/lib/rabbitmqctl.ex b/deps/rabbitmq_cli/lib/rabbitmqctl.ex index fc9fd2321f39..42ce92c567b5 100644 --- a/deps/rabbitmq_cli/lib/rabbitmqctl.ex +++ b/deps/rabbitmq_cli/lib/rabbitmqctl.ex @@ -636,7 +636,7 @@ defmodule RabbitMQCtl do ## {:fun, fun} - run a custom function to enable distribution. ## custom mode is usefult for commands which should have specific node name. ## Runs code if distribution is successful, or not needed. - @spec maybe_with_distribution(module(), options(), (-> command_result())) :: command_result() + @spec maybe_with_distribution(module(), options(), (() -> command_result())) :: command_result() defp maybe_with_distribution(command, options, code) do try do maybe_with_distribution_without_catch(command, options, code) diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index cef220fb1a9c..97e2782a6fdc 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -572,7 +572,12 @@ defmodule TestHelper do :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [node, queue_resource]) - :ok = :rabbit_misc.rpc_call(node, :rabbit_control_misc, :await_state, [node, queue_resource, :crashed]) + :ok = + :rabbit_misc.rpc_call(node, :rabbit_control_misc, :await_state, [ + node, + queue_resource, + :crashed + ]) {:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true) :crashed = :amqqueue.get_state(existing_amqqueue) From a21354043a9fcc02c5d90da3fabbcde7013f9d54 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 11:07:48 +0100 Subject: [PATCH 10/17] be extra defensive on the kill_queue/2 api --- deps/rabbit/src/rabbit_amqqueue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 56796f50053d..c09da20af6b8 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -2119,7 +2119,7 @@ kill_queue_hard(Node, QRes = #resource{kind = queue}) -> end. -spec kill_queue(node(), name()) -> ok. -kill_queue(Node, QRes) -> +kill_queue(Node, QRes = #resource{kind = queue}) -> kill_queue(Node, QRes, boom). -spec kill_queue(node(), name(), atom()) -> ok. From 0674cdf4b24c9c3ce9f6f8153fae25c0263fdad7 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 11:30:40 +0100 Subject: [PATCH 11/17] make rabbit_control_misc:await_state/4 api more consistent on the queue name arg types it accepts --- deps/rabbit_common/src/rabbit_control_misc.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl index 4cbc2b3a9eea..ea7c7e5c47b4 100644 --- a/deps/rabbit_common/src/rabbit_control_misc.erl +++ b/deps/rabbit_common/src/rabbit_control_misc.erl @@ -30,7 +30,7 @@ -spec await_emitters_termination([pid()]) -> 'ok'. -spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid(). -spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'. --spec await_state(node(), rabbit_amqqueue:name(), atom(), integer()) -> 'ok'. +-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'. -spec print_cmd_result(atom(), term()) -> 'ok'. @@ -199,6 +199,9 @@ await_state(Node, QName, State) when is_binary(QName) -> await_state(Node, QRes = #resource{kind = queue}, State) -> await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT). +await_state(Node, QName, State, Time) when is_binary(QName) -> + QRes = rabbit_misc:r(<<"/">>, queue, QName), + await_state(Node, QRes, State, Time); await_state(Node, QRes = #resource{kind = queue}, State, Time) -> case state(Node, QRes) of State -> From 4e8474a5e070a10ef1b0434a69bb58d4cad92fc3 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 11:48:43 +0100 Subject: [PATCH 12/17] use descriptive macros instead of magic numbers in await_new_pid/3 and await_state/4 --- deps/rabbit_common/src/rabbit_control_misc.erl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl index ea7c7e5c47b4..9df4fd99be59 100644 --- a/deps/rabbit_common/src/rabbit_control_misc.erl +++ b/deps/rabbit_common/src/rabbit_control_misc.erl @@ -34,7 +34,10 @@ -spec print_cmd_result(atom(), term()) -> 'ok'. --define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000). +-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000). +-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10). +-define(AWAIT_STATE_DELAY_INTERVAL, 100). +-define(AWAIT_STATE_DELAY_TIME_DELTA, 100). emitting_map(AggregatorPid, Ref, Fun, List) -> emitting_map(AggregatorPid, Ref, Fun, List, continue), @@ -188,7 +191,7 @@ print_cmd_result(join_cluster, already_member) -> io:format("The node is already await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) -> case rabbit_amqqueue:pid_or_crashed(Node, QRes) of - OldPid -> timer:sleep(10), + OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL), await_new_pid(Node, QRes, OldPid); New -> New end. @@ -209,8 +212,8 @@ await_state(Node, QRes = #resource{kind = queue}, State, Time) -> Other -> case Time of 0 -> exit({timeout_awaiting_state, State, Other}); - _ -> timer:sleep(100), - await_state(Node, QRes, State, Time - 100) + _ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL), + await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA) end end. From 1097ebe66e6011af65fc1f9875ce8ac0115335bc Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 12:13:51 +0100 Subject: [PATCH 13/17] improve error handling for non-existent queues in pid_or_crashed/2 api and update spec --- deps/rabbit/src/rabbit_amqqueue.erl | 32 ++++++++++++++++------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index c09da20af6b8..9153df83f1ba 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -2128,20 +2128,24 @@ kill_queue(Node, QRes = #resource{kind = queue}, Reason) -> exit(Pid1, Reason), rabbit_control_misc:await_new_pid(Node, QRes, Pid1). --spec pid_or_crashed(node(), name()) -> pid() | crashed. +-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()). pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) -> - {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [QRes]), - QPid = amqqueue:get_pid(Q), - State = amqqueue:get_state(Q), - case State of - crashed -> - case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of - {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; - {ok, SPid} -> - case rabbit_misc:remote_sup_child(Node, SPid) of - {ok, _} -> QPid; %% restarting - {error, no_child} -> crashed %% given up - end + case rpc:call(Node, rabbit_amqqueue, lookup, [QRes]) of + {ok, Q} -> + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + case State of + crashed -> + case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of + {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; + {ok, SPid} -> + case rabbit_misc:remote_sup_child(Node, SPid) of + {ok, _} -> QPid; %% restarting + {error, no_child} -> crashed %% given up + end + end; + _ -> QPid end; - _ -> QPid + Error = {error, _} -> Error; + Reason -> {error, Reason} end. From 290c1c05a49df8eb719b20677994340d4b4f13ea Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Thu, 7 Sep 2023 17:44:29 +0100 Subject: [PATCH 14/17] add test for ctl delete_queue for a stopped queue --- deps/rabbit/src/rabbit_amqqueue.erl | 23 ++++++++++++++----- .../test/ctl/delete_queue_command_test.exs | 18 +++++++++++++++ deps/rabbitmq_cli/test/test_helper.exs | 13 +++++++++++ 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 9153df83f1ba..c55e631bd07a 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -72,7 +72,7 @@ -export([prepend_extra_bcc/1]). -export([queue/1, queue_names/1]). --export([kill_queue/2, kill_queue/3, kill_queue_hard/2]). +-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -119,6 +119,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, active, activity_status, arguments]). +-define(KILL_QUEUE_DELAY_INTERVAL, 100). warn_file_limit() -> DurableQueues = find_recoverable_queues(), @@ -2112,17 +2113,27 @@ is_queue_args_combination_permitted(Durable, Exclusive) -> -spec kill_queue_hard(node(), name()) -> ok. kill_queue_hard(Node, QRes = #resource{kind = queue}) -> - case kill_queue(Node, QRes) of + kill_queue_hard(Node, QRes, boom). + +-spec kill_queue_hard(node(), name(), atom()) -> ok. +kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) -> + case kill_queue(Node, QRes, Reason) of crashed -> ok; - _NewPid -> timer:sleep(100), - kill_queue_hard(Node, QRes) + stopped -> ok; + _NewPid -> timer:sleep(?KILL_QUEUE_DELAY_INTERVAL), + kill_queue_hard(Node, QRes, Reason) end. --spec kill_queue(node(), name()) -> ok. +-spec kill_queue(node(), name()) -> pid() | crashed. kill_queue(Node, QRes = #resource{kind = queue}) -> kill_queue(Node, QRes, boom). --spec kill_queue(node(), name(), atom()) -> ok. +-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped. +kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) -> + Pid1 = pid_or_crashed(Node, QRes), + exit(Pid1, Reason), + rabbit_control_misc:await_state(Node, QRes, stopped), + stopped; kill_queue(Node, QRes = #resource{kind = queue}, Reason) -> Pid1 = pid_or_crashed(Node, QRes), exit(Pid1, Reason), diff --git a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs index 6a8a5888ffbe..6e9036dd6501 100644 --- a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs @@ -94,6 +94,24 @@ defmodule DeleteQueueCommandTest do {:error, :not_found} = lookup_queue(q, @vhost) end + @tag test_timeout: 30000 + test "run: request to an existing stopped queue on active node succeeds", context do + add_vhost(@vhost) + set_permissions(@user, @vhost, [".*", ".*", ".*"]) + on_exit(context, fn -> delete_vhost(@vhost) end) + + q = "bar" + n = 20 + + declare_queue(q, @vhost, true) + publish_messages(@vhost, q, n) + q_resource = :rabbit_misc.r(@vhost, :queue, q) + stop_queue(q_resource) + + assert @command.run([q], context[:opts]) == {:ok, 0} + {:error, :not_found} = lookup_queue(q, @vhost) + end + @tag test_timeout: 30000 test "run: request to a non-existent queue on active node returns not found", context do assert @command.run(["non-existent"], context[:opts]) == {:error, :not_found} diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index 97e2782a6fdc..c8fefcf6dc5a 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -583,6 +583,19 @@ defmodule TestHelper do :crashed = :amqqueue.get_state(existing_amqqueue) end + def stop_queue(queue_resource = {:resource, vhost, :queue, queue_name}) do + node = get_rabbit_hostname() + + :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [ + node, + queue_resource, + :shutdown + ]) + + {:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true) + :stopped = :amqqueue.get_state(existing_amqqueue) + end + def delete_all_queues() do try do immediately_delete_all_queues(:rabbit_amqqueue.list()) From 1d163886fde9ea8d6fea840f53a22e9a96bff055 Mon Sep 17 00:00:00 2001 From: Ayanda-D Date: Fri, 8 Sep 2023 10:21:08 +0100 Subject: [PATCH 15/17] new rabbit_amqqueue_control for queue related control operations and fix bazel issues raised in MK's review --- deps/rabbit/app.bzl | 3 + deps/rabbit/src/rabbit_amqqueue.erl | 33 ++++++----- deps/rabbit/src/rabbit_amqqueue_control.erl | 57 +++++++++++++++++++ deps/rabbit/test/crashing_queues_SUITE.erl | 8 +-- .../rabbit_common/src/rabbit_control_misc.erl | 46 --------------- deps/rabbit_common/src/rabbit_misc.erl | 2 +- deps/rabbit_common/src/rabbit_types.erl | 11 +++- deps/rabbitmq_cli/lib/rabbitmqctl.ex | 2 +- deps/rabbitmq_cli/test/test_helper.exs | 2 +- deps/rabbitmq_ct_client_helpers/BUILD.bazel | 1 + deps/rabbitmq_ct_helpers/BUILD.bazel | 1 + deps/rabbitmq_mqtt/BUILD.bazel | 5 +- moduleindex.yaml | 1 + 13 files changed, 104 insertions(+), 68 deletions(-) create mode 100644 deps/rabbit/src/rabbit_amqqueue_control.erl diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 3eda8b4a4e85..7432ed8694ae 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqqueue.erl", + "src/rabbit_amqqueue_control.erl", "src/rabbit_amqqueue_process.erl", "src/rabbit_amqqueue_sup.erl", "src/rabbit_amqqueue_sup_sup.erl", @@ -297,6 +298,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqqueue.erl", + "src/rabbit_amqqueue_control.erl", "src/rabbit_amqqueue_process.erl", "src/rabbit_amqqueue_sup.erl", "src/rabbit_amqqueue_sup_sup.erl", @@ -559,6 +561,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqqueue.erl", + "src/rabbit_amqqueue_control.erl", "src/rabbit_amqqueue_process.erl", "src/rabbit_amqqueue_sup.erl", "src/rabbit_amqqueue_sup_sup.erl", diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index c55e631bd07a..991ba55aee9b 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1606,12 +1606,12 @@ delete(Q, IfUnused, IfEmpty, ActingUser) -> rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser). -spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) -> - rabbit_types:ok(integer()) | rabbit_channel:channel_or_connection_exit(). + rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit(). delete_with(QueueName, IfUnused, IfEmpty, ActingUser) -> delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false). --spec delete_with(amqqueue:amqqueue() | name(), pid(), boolean(), boolean(), rabbit_types:username(), boolean()) -> - rabbit_types:ok(integer()) | rabbit_channel:channel_or_connection_exit(). +-spec delete_with(amqqueue:amqqueue() | name(), pid() | undefined, boolean(), boolean(), rabbit_types:username(), boolean()) -> + rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit(). delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) -> QueueName = amqqueue:get_name(AMQQueue), delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive); @@ -1629,10 +1629,10 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe fun (not_found) -> {ok, 0}; ({absent, Q, crashed}) -> - _ = rabbit_classic_queue:delete_crashed(Q, Username), + _ = delete_crashed(Q, Username), {ok, 0}; ({absent, Q, stopped}) -> - _ = rabbit_classic_queue:delete_crashed(Q, Username), + _ = delete_crashed(Q, Username), {ok, 0}; ({absent, Q, Reason}) -> absent(Q, Reason) @@ -2120,24 +2120,31 @@ kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) -> case kill_queue(Node, QRes, Reason) of crashed -> ok; stopped -> ok; - _NewPid -> timer:sleep(?KILL_QUEUE_DELAY_INTERVAL), - kill_queue_hard(Node, QRes, Reason) + NewPid when is_pid(NewPid) -> timer:sleep(?KILL_QUEUE_DELAY_INTERVAL), + kill_queue_hard(Node, QRes, Reason); + Error -> Error end. --spec kill_queue(node(), name()) -> pid() | crashed. +-spec kill_queue(node(), name()) -> pid() | crashed | stopped | rabbit_types:error(term()). kill_queue(Node, QRes = #resource{kind = queue}) -> kill_queue(Node, QRes, boom). --spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped. +-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped | rabbit_types:error(term()). kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) -> Pid1 = pid_or_crashed(Node, QRes), exit(Pid1, Reason), - rabbit_control_misc:await_state(Node, QRes, stopped), + rabbit_amqqueue_control:await_state(Node, QRes, stopped), stopped; kill_queue(Node, QRes = #resource{kind = queue}, Reason) -> - Pid1 = pid_or_crashed(Node, QRes), - exit(Pid1, Reason), - rabbit_control_misc:await_new_pid(Node, QRes, Pid1). + case pid_or_crashed(Node, QRes) of + Pid1 when is_pid(Pid1) -> + exit(Pid1, Reason), + rabbit_amqqueue_control:await_new_pid(Node, QRes, Pid1); + crashed -> + crashed; + Error -> + Error + end. -spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()). pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue_control.erl b/deps/rabbit/src/rabbit_amqqueue_control.erl new file mode 100644 index 000000000000..d38e878d85bb --- /dev/null +++ b/deps/rabbit/src/rabbit_amqqueue_control.erl @@ -0,0 +1,57 @@ +% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_amqqueue_control). + +-export([await_new_pid/3, await_state/3, await_state/4]). + +-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000). +-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10). +-define(AWAIT_STATE_DELAY_INTERVAL, 100). +-define(AWAIT_STATE_DELAY_TIME_DELTA, 100). + +-include_lib("rabbit_common/include/resource.hrl"). + +-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid(). +await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) -> + case rabbit_amqqueue:pid_or_crashed(Node, QRes) of + OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL), + await_new_pid(Node, QRes, OldPid); + New -> New + end. + +-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'. +await_state(Node, QName, State) when is_binary(QName) -> + QRes = rabbit_misc:r(<<"/">>, queue, QName), + await_state(Node, QRes, State); +await_state(Node, QRes = #resource{kind = queue}, State) -> + await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT). + +-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'. +await_state(Node, QName, State, Time) when is_binary(QName) -> + QRes = rabbit_misc:r(<<"/">>, queue, QName), + await_state(Node, QRes, State, Time); +await_state(Node, QRes = #resource{kind = queue}, State, Time) -> + case state(Node, QRes) of + State -> + ok; + Other -> + case Time of + 0 -> exit({timeout_awaiting_state, State, Other}); + _ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL), + await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA) + end + end. + +state(Node, QRes = #resource{virtual_host = VHost, kind = queue}) -> + Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]), + fetch_state(QRes, Infos). + +fetch_state(_QRes, []) -> undefined; +fetch_state(QRes, [[{name, QRes}, {state, State}] | _]) -> State; +fetch_state(QRes, [[{name, _}, {state, _State}] | Rem]) -> + fetch_state(QRes, Rem). diff --git a/deps/rabbit/test/crashing_queues_SUITE.erl b/deps/rabbit/test/crashing_queues_SUITE.erl index 4bf4bf339735..55fe8acd54b8 100644 --- a/deps/rabbit/test/crashing_queues_SUITE.erl +++ b/deps/rabbit/test/crashing_queues_SUITE.erl @@ -113,22 +113,22 @@ give_up_after_repeated_crashes(Config) -> amqp_channel:call(ChA, #'confirm.select'{}), amqp_channel:call(ChA, #'queue.declare'{queue = QName, durable = true}), - rabbit_control_misc:await_state(A, QName, running), + rabbit_amqqueue_control:await_state(A, QName, running), publish(ChA, QName, durable), QRes = rabbit_misc:r(<<"/">>, queue, QName), rabbit_amqqueue:kill_queue_hard(A, QRes), {'EXIT', _} = (catch amqp_channel:call( ChA, #'queue.declare'{queue = QName, durable = true})), - rabbit_control_misc:await_state(A, QName, crashed), + rabbit_amqqueue_control:await_state(A, QName, crashed), amqp_channel:call(ChB, #'queue.delete'{queue = QName}), amqp_channel:call(ChB, #'queue.declare'{queue = QName, durable = true}), - rabbit_control_misc:await_state(A, QName, running), + rabbit_amqqueue_control:await_state(A, QName, running), %% Since it's convenient, also test absent queue status here. rabbit_ct_broker_helpers:stop_node(Config, B), - rabbit_control_misc:await_state(A, QName, down), + rabbit_amqqueue_control:await_state(A, QName, down), ok. diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl index 9df4fd99be59..6b7b09c64deb 100644 --- a/deps/rabbit_common/src/rabbit_control_misc.erl +++ b/deps/rabbit_common/src/rabbit_control_misc.erl @@ -7,12 +7,9 @@ -module(rabbit_control_misc). --include_lib("rabbit_common/include/resource.hrl"). - -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4, emitting_map_with_exit_handler/5, wait_for_info_messages/6, spawn_emitter_caller/7, await_emitters_termination/1, - await_new_pid/3, await_state/3, await_state/4, print_cmd_result/2]). -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'. @@ -28,17 +25,9 @@ InitialAcc :: term(), Acc :: term(), OK :: {ok, Acc}, Err :: {error, term()}. -spec spawn_emitter_caller(node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'. -spec await_emitters_termination([pid()]) -> 'ok'. --spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid(). --spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'. --spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'. -spec print_cmd_result(atom(), term()) -> 'ok'. --define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000). --define(AWAIT_NEW_PID_DELAY_INTERVAL, 10). --define(AWAIT_STATE_DELAY_INTERVAL, 100). --define(AWAIT_STATE_DELAY_TIME_DELTA, 100). - emitting_map(AggregatorPid, Ref, Fun, List) -> emitting_map(AggregatorPid, Ref, Fun, List, continue), AggregatorPid ! {Ref, finished}, @@ -188,38 +177,3 @@ notify_if_timeout(Pid, Ref, Timeout) -> print_cmd_result(authenticate_user, _Result) -> io:format("Success~n"); print_cmd_result(join_cluster, already_member) -> io:format("The node is already a member of this cluster~n"). - -await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) -> - case rabbit_amqqueue:pid_or_crashed(Node, QRes) of - OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL), - await_new_pid(Node, QRes, OldPid); - New -> New - end. - -await_state(Node, QName, State) when is_binary(QName) -> - QRes = rabbit_misc:r(<<"/">>, queue, QName), - await_state(Node, QRes, State); -await_state(Node, QRes = #resource{kind = queue}, State) -> - await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT). - -await_state(Node, QName, State, Time) when is_binary(QName) -> - QRes = rabbit_misc:r(<<"/">>, queue, QName), - await_state(Node, QRes, State, Time); -await_state(Node, QRes = #resource{kind = queue}, State, Time) -> - case state(Node, QRes) of - State -> - ok; - Other -> - case Time of - 0 -> exit({timeout_awaiting_state, State, Other}); - _ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL), - await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA) - end - end. - -state(Node, QRes = #resource{virtual_host = VHost}) -> - Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]), - case Infos of - [] -> undefined; - [[{name, QRes}, {state, State}]] -> State - end. diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index a1908c507742..a9aa27b0bfff 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -1641,7 +1641,7 @@ maps_put_falsy(K, false, M) -> maps_put_falsy(_K, _V, M) -> M. --spec remote_sup_child(node(), supervisor:sup_ref()) -> rabbit_types:ok_or_error2(supervisor:child(), no_child | no_sup). +-spec remote_sup_child(node(), rabbit_types:sup_ref()) -> rabbit_types:ok_or_error2(rabbit_types:child(), no_child | no_sup). remote_sup_child(Node, Sup) -> case rpc:call(Node, supervisor, which_children, [Sup]) of [{_, Child, _, _}] -> {ok, Child}; diff --git a/deps/rabbit_common/src/rabbit_types.erl b/deps/rabbit_common/src/rabbit_types.erl index 40a7e82008fd..1c72ce5d54b5 100644 --- a/deps/rabbit_common/src/rabbit_types.erl +++ b/deps/rabbit_common/src/rabbit_types.erl @@ -30,7 +30,8 @@ node_type/0, topic_access_context/0, authz_data/0, authz_context/0, permission_atom/0, rabbit_amqqueue_name/0, binding_key/0, channel_number/0, - exchange_name/0, exchange_type/0, guid/0, routing_key/0]). + exchange_name/0, exchange_type/0, guid/0, routing_key/0, + sup_ref/0, child/0]). -type(option(T) :: T | 'none' | 'undefined'). %% Deprecated, 'maybe' is a keyword in modern Erlang @@ -216,3 +217,11 @@ -type(authz_context() :: map()). -type(permission_atom() :: 'configure' | 'write' | 'read'). + +-type sup_ref() :: (Name :: atom()) + | {Name :: atom(), Node :: node()} + | {'global', Name :: term()} + | {'via', Module :: module(), Name :: any()} + | pid(). + +-type child() :: 'undefined' | pid(). diff --git a/deps/rabbitmq_cli/lib/rabbitmqctl.ex b/deps/rabbitmq_cli/lib/rabbitmqctl.ex index 42ce92c567b5..fc9fd2321f39 100644 --- a/deps/rabbitmq_cli/lib/rabbitmqctl.ex +++ b/deps/rabbitmq_cli/lib/rabbitmqctl.ex @@ -636,7 +636,7 @@ defmodule RabbitMQCtl do ## {:fun, fun} - run a custom function to enable distribution. ## custom mode is usefult for commands which should have specific node name. ## Runs code if distribution is successful, or not needed. - @spec maybe_with_distribution(module(), options(), (() -> command_result())) :: command_result() + @spec maybe_with_distribution(module(), options(), (-> command_result())) :: command_result() defp maybe_with_distribution(command, options, code) do try do maybe_with_distribution_without_catch(command, options, code) diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index c8fefcf6dc5a..7573bd81b9aa 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -573,7 +573,7 @@ defmodule TestHelper do :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [node, queue_resource]) :ok = - :rabbit_misc.rpc_call(node, :rabbit_control_misc, :await_state, [ + :rabbit_misc.rpc_call(node, :rabbit_amqqueue_control, :await_state, [ node, queue_resource, :crashed diff --git a/deps/rabbitmq_ct_client_helpers/BUILD.bazel b/deps/rabbitmq_ct_client_helpers/BUILD.bazel index 2bbee7a93b7f..556ff955c7d7 100644 --- a/deps/rabbitmq_ct_client_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_client_helpers/BUILD.bazel @@ -33,6 +33,7 @@ rabbitmq_app( hdrs = [":public_hdrs"], app_name = "rabbitmq_ct_client_helpers", beam_files = [":beam_files"], + extra_apps = ["rabbit_common"], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_ct_helpers/BUILD.bazel b/deps/rabbitmq_ct_helpers/BUILD.bazel index 2f921ad4f2ea..b157ea7cea50 100644 --- a/deps/rabbitmq_ct_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_helpers/BUILD.bazel @@ -39,6 +39,7 @@ rabbitmq_app( hdrs = [":public_hdrs"], app_name = "rabbitmq_ct_helpers", beam_files = [":beam_files"], + extra_apps = ["inet_tcp_proxy"], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_mqtt/BUILD.bazel b/deps/rabbitmq_mqtt/BUILD.bazel index 7c2272a82621..04855637fd1b 100644 --- a/deps/rabbitmq_mqtt/BUILD.bazel +++ b/deps/rabbitmq_mqtt/BUILD.bazel @@ -78,7 +78,10 @@ rabbitmq_app( app_module = APP_MODULE, app_name = APP_NAME, beam_files = [":beam_files"], - extra_apps = ["ssl"], + extra_apps = [ + "ssl", + "amqp_client", + ], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/moduleindex.yaml b/moduleindex.yaml index 31e7a75ffa90..52c85ffcfd89 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -584,6 +584,7 @@ rabbit: - rabbit_access_control - rabbit_alarm - rabbit_amqqueue +- rabbit_amqqueue_control - rabbit_amqqueue_process - rabbit_amqqueue_sup - rabbit_amqqueue_sup_sup From 3bd51f75fb300886fbe3737b5f1d11ab2d0c53ba Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 8 Sep 2023 17:31:35 +0100 Subject: [PATCH 16/17] try revert extra_apps generated and added on bazel run gazelle to fix failures on ci --- deps/rabbitmq_ct_client_helpers/BUILD.bazel | 1 - deps/rabbitmq_ct_helpers/BUILD.bazel | 1 - deps/rabbitmq_mqtt/BUILD.bazel | 5 +---- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/deps/rabbitmq_ct_client_helpers/BUILD.bazel b/deps/rabbitmq_ct_client_helpers/BUILD.bazel index 556ff955c7d7..2bbee7a93b7f 100644 --- a/deps/rabbitmq_ct_client_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_client_helpers/BUILD.bazel @@ -33,7 +33,6 @@ rabbitmq_app( hdrs = [":public_hdrs"], app_name = "rabbitmq_ct_client_helpers", beam_files = [":beam_files"], - extra_apps = ["rabbit_common"], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_ct_helpers/BUILD.bazel b/deps/rabbitmq_ct_helpers/BUILD.bazel index b157ea7cea50..2f921ad4f2ea 100644 --- a/deps/rabbitmq_ct_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_helpers/BUILD.bazel @@ -39,7 +39,6 @@ rabbitmq_app( hdrs = [":public_hdrs"], app_name = "rabbitmq_ct_helpers", beam_files = [":beam_files"], - extra_apps = ["inet_tcp_proxy"], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_mqtt/BUILD.bazel b/deps/rabbitmq_mqtt/BUILD.bazel index 04855637fd1b..7c2272a82621 100644 --- a/deps/rabbitmq_mqtt/BUILD.bazel +++ b/deps/rabbitmq_mqtt/BUILD.bazel @@ -78,10 +78,7 @@ rabbitmq_app( app_module = APP_MODULE, app_name = APP_NAME, beam_files = [":beam_files"], - extra_apps = [ - "ssl", - "amqp_client", - ], + extra_apps = ["ssl"], license_files = [":license_files"], priv = [":priv"], deps = [ From 779c9e413968c5716f694ba4b8fe9fcdf03f7844 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Mon, 11 Sep 2023 10:59:26 +0100 Subject: [PATCH 17/17] formatting and ensure use sup specs in rabbit_types --- deps/rabbit/src/rabbit_amqqueue.erl | 5 +++-- deps/rabbit_common/src/rabbit_misc.erl | 12 ++---------- deps/rabbit_common/src/rabbit_types.erl | 4 +++- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 991ba55aee9b..f2df0d8695e9 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -2120,8 +2120,9 @@ kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) -> case kill_queue(Node, QRes, Reason) of crashed -> ok; stopped -> ok; - NewPid when is_pid(NewPid) -> timer:sleep(?KILL_QUEUE_DELAY_INTERVAL), - kill_queue_hard(Node, QRes, Reason); + NewPid when is_pid(NewPid) -> + timer:sleep(?KILL_QUEUE_DELAY_INTERVAL), + kill_queue_hard(Node, QRes, Reason); Error -> Error end. diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index a9aa27b0bfff..08c288a66ba6 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -1450,18 +1450,10 @@ safe_ets_update_element(Tab, Key, ElementSpec, OnSuccess, OnFailure) -> false end. -%% not exported by supervisor --type supervisor_child_id() :: term(). --type supervisor_sup_ref() :: (Name :: atom()) - | {Name :: atom(), Node :: node()} - | {'global', Name :: atom()} - | {'via', Module :: module(), Name :: any()} - | pid(). - %% this used to be in supervisor2 -spec find_child(Supervisor, Name) -> [pid()] when - Supervisor :: supervisor_sup_ref(), - Name :: supervisor_child_id(). + Supervisor :: rabbit_types:sup_ref(), + Name :: rabbit_types:child_id(). find_child(Supervisor, Name) -> [Pid || {Name1, Pid, _Type, _Modules} <- supervisor:which_children(Supervisor), Name1 =:= Name]. diff --git a/deps/rabbit_common/src/rabbit_types.erl b/deps/rabbit_common/src/rabbit_types.erl index 1c72ce5d54b5..e58811c6c58d 100644 --- a/deps/rabbit_common/src/rabbit_types.erl +++ b/deps/rabbit_common/src/rabbit_types.erl @@ -31,7 +31,7 @@ authz_data/0, authz_context/0, permission_atom/0, rabbit_amqqueue_name/0, binding_key/0, channel_number/0, exchange_name/0, exchange_type/0, guid/0, routing_key/0, - sup_ref/0, child/0]). + sup_ref/0, child/0, child_id/0]). -type(option(T) :: T | 'none' | 'undefined'). %% Deprecated, 'maybe' is a keyword in modern Erlang @@ -218,6 +218,7 @@ -type(permission_atom() :: 'configure' | 'write' | 'read'). +%% not exported by OTP supervisor -type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: term()} @@ -225,3 +226,4 @@ | pid(). -type child() :: 'undefined' | pid(). +-type child_id() :: term().