From 2c1b75276e4b3b1eff8a6f6f8c99bb1fcfa0ca1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Mon, 29 Sep 2025 15:33:34 +0200 Subject: [PATCH 1/2] rabbit_node_monitor: Notify `rabbit` is live when handling a `nodeup` message [Why] So far, when there was a network partition with Mnesia, the most popular partition handling strategies restarted RabbitMQ nodes. Therefore, `rabbit` would execute the boot steps and one of them would notify other members of the cluster that "this RabbitMQ node is live". With Khepri, nodes are not restarted anymore and thus, boot steps are not executed at the end of a network partition. As a consequence, other members are not notified that a member is back online. [How] When the node monitor receives the `nodeup` message (managed by Erlang, meaning that "a remote Erlang node just connected to this node through Erlang distribution"), a `node_up` message is sent to all cluster members (meaning "RabbitMQ is now running on the originating node"). Yeah, very poor naming... This lets the RabbitMQ node monitor know when other nodes running RabbitMQ are back online and react accordingly. If a node is restarted, it means that another node could receive the `node_up` message twice. The actions behind it must be idempotent. --- deps/rabbit/src/rabbit_node_monitor.erl | 32 +++++++++++++++++-------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/deps/rabbit/src/rabbit_node_monitor.erl b/deps/rabbit/src/rabbit_node_monitor.erl index 5939c156b259..6a04d4cafe54 100644 --- a/deps/rabbit/src/rabbit_node_monitor.erl +++ b/deps/rabbit/src/rabbit_node_monitor.erl @@ -430,16 +430,8 @@ handle_call(status, _From, State = #state{partitions = Partitions}) -> handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast(notify_node_up, State = #state{guid = GUID}) -> - Nodes = rabbit_nodes:list_reachable() -- [node()], - gen_server:abcast(Nodes, ?SERVER, - {node_up, node(), rabbit_db_cluster:node_type(), GUID}), - %% register other active rabbits with this rabbit - DiskNodes = rabbit_db_cluster:disc_members(), - [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of - true -> disc; - false -> ram - end}) || N <- Nodes], +handle_cast(notify_node_up, State) -> + do_notify_node_up(State), {noreply, State}; %%---------------------------------------------------------------------------- @@ -665,6 +657,12 @@ handle_info({nodedown, Node, Info}, State) -> handle_info({nodeup, Node, _Info}, State) -> ?LOG_INFO("node ~tp up", [Node]), + %% We notify that `rabbit' is up here too (in addition to the message sent + %% explicitly by a boot step. That's because nodes may go down then up + %% during a network partition, and with Khepri, nodes are not restarted + %% (unlike with some partition handling strategies used with Mnesia), and + %% thus the boot steps are not executed. + do_notify_node_up(State), {noreply, State}; handle_info({mnesia_system_event, @@ -854,6 +852,20 @@ wait_for_cluster_recovery(Condition) -> wait_for_cluster_recovery(Condition) end. +do_notify_node_up(#state{guid = GUID}) -> + Nodes = rabbit_nodes:list_reachable() -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {node_up, node(), rabbit_db_cluster:node_type(), GUID}), + %% register other active rabbits with this rabbit + DiskNodes = rabbit_db_cluster:disc_members(), + _ = [gen_server:cast( + ?SERVER, + {node_up, N, case lists:member(N, DiskNodes) of + true -> disc; + false -> ram + end}) || N <- Nodes], + ok. + handle_dead_rabbit(Node, State) -> %% TODO: This may turn out to be a performance hog when there are %% lots of nodes. We really only need to execute some of these From 3c4d0734053e508f4d551510159323aacf8dbd85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 19 Sep 2025 11:53:40 +0200 Subject: [PATCH 2/2] Keep exclusive/auto-delete queues with Khepri + network partition [Why] With Mnesia, when the network partition strategy is set to `pause_minority`, nodes on the "minority side" are stopped. Thus, the exclusive queues that were hosted by nodes on that minority side are lost: * Consumers connected on these nodes are disconnected because the nodes are stopped. * Queue records on the majority side are deleted from the metadata store. This was ok with Mnesia and how this network partition handling strategy is implemented. However, it does not work with Khepri because the nodes on the "minority side" continue to run and serve clients. Therefore the cluster ends up in a weird situation: 1. The "majority side" deleted the queue records. 2. When the network partition is solved, the "minority side" gets the record deletion, but the queue processes continue to run. This was similar for auto-delete queues. [How] With Khepri, we stop to delete transient queue records in general, just because there is a node going down. Thanks to this, an exclusive or an auto-delete queue and its consumer(s) are not affected by a network partition: they continue to work. However, if a node is really lost, we need to clean up dead queue records. This was already done for durable queues with both Mnesia and Khepri. But with Khepri, transient queue records persist in the store like durable queue records (unlike with Mnesia). That's why this commit changes the clean-up function, `rabbit_amqqueue:forget_all_durable/1` into `rabbit_amqqueue:forget_all/1` which deletes all queue records of queues that were hosted on the given node, regardless if they are transient or durable. In addition to this, the queue process will spawn a temporary process who will try to delete the underlying record indefinitely if no other processes are waiting for a reply from the queue process. That's the case for queues that are deleted because of an internal event (like the exclusive/auto-delete conditions). The queue process will exit, which will notify connections that the queue is gone. Thanks to this, the temporary process will do its best to delete the record in case of a network partition, whether the consumers go away during or after that partition. That said, the node monitor drives some failsafe code that cleans up record if the queue process was killed before it could delete its own record. Fixes #12949, #12597, #14527. --- deps/rabbit/src/rabbit_amqqueue.erl | 50 +- deps/rabbit/src/rabbit_amqqueue_process.erl | 69 +- deps/rabbit/src/rabbit_db_queue.erl | 29 +- deps/rabbit/src/rabbit_khepri.erl | 4 +- deps/rabbit/src/rabbit_mnesia.erl | 2 +- deps/rabbit/test/bindings_SUITE.erl | 45 +- .../rabbit/test/clustering_recovery_SUITE.erl | 669 +++++++++++++++++- deps/rabbit/test/rabbit_db_queue_SUITE.erl | 4 +- 8 files changed, 777 insertions(+), 95 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 36f6b63966df..dbaad06bca8a 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -9,7 +9,7 @@ -export([recover/1, stop/1, start/1, declare/6, declare/7, delete_immediately/1, delete_exclusive/2, delete/4, purge/1, - forget_all_durable/1]). + forget_all/1]). -export([pseudo_queue/2, pseudo_queue/3]). -export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1, not_found_or_absent_dirty/1, @@ -1882,19 +1882,19 @@ internal_delete(Queue, ActingUser, Reason) -> {user_who_performed_action, ActingUser}]) end. --spec forget_all_durable(node()) -> 'ok'. +-spec forget_all(node()) -> 'ok'. -%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running` -%% Does it make any sense once mnesia is not used/removed? -forget_all_durable(Node) -> - ?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]), +%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and +%% `rabbit_khepri:remove_*_member/1'. +forget_all(Node) -> + ?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]), UpdateFun = fun(Q) -> forget_node_for_queue(Q) end, FilterFun = fun(Q) -> is_local_to_node(amqqueue:get_pid(Q), Node) end, - rabbit_db_queue:foreach_durable(UpdateFun, FilterFun). + rabbit_db_queue:foreach(UpdateFun, FilterFun). forget_node_for_queue(Q) when ?amqqueue_is_quorum(Q) -> @@ -1936,27 +1936,31 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> -spec on_node_up(node()) -> 'ok'. -on_node_up(_Node) -> - ok. +on_node_up(Node) -> + case rabbit_khepri:is_enabled() of + true -> + %% With Khepri, we try to delete transient queues now because it's + %% possible any updates timed out because of the lack of a quorum + %% while `Node' was down. + ok = delete_transient_queues_on_node(Node); + false -> + ok + end. -spec on_node_down(node()) -> 'ok'. on_node_down(Node) -> - case delete_transient_queues_on_node(Node) of - ok -> + case rabbit_khepri:is_enabled() of + true -> + %% With Khepri, we don't delete transient/exclusive queues. There + %% may be a network partition and the node will be reachable again + %% after the partition is repaired. + %% + %% If the node will never come back, it will likely be removed from + %% the cluster. We take care of transient queues at that time. ok; - {error, timeout} -> - %% This case is possible when running Khepri. The node going down - %% could leave the cluster in a minority so the command to delete - %% the transient queue records would fail. Also see - %% `rabbit_khepri:init/0': we also try this deletion when the node - %% restarts - a time that the cluster is very likely to have a - %% majority - to ensure these records are deleted. - ?LOG_WARNING("transient queues for node '~ts' could not be " - "deleted because of a timeout. These queues " - "will be removed when node '~ts' restarts or " - "is removed from the cluster.", [Node, Node]), - ok + false -> + ok = delete_transient_queues_on_node(Node) end. -spec delete_transient_queues_on_node(Node) -> Ret when diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 0528ab8389b0..ab66d264eaff 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -352,21 +352,66 @@ terminate_delete(EmitStats, Reason0, ReplyTo, fun() -> emit_stats(State) end); true -> ok end, - %% This try-catch block transforms throws to errors since throws are not - %% logged. When mnesia is removed this `try` can be removed: Khepri - %% returns errors as error tuples instead. - Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of - ok -> - {ok, Len}; - {error, _} = Err -> - Err - catch - {error, ReasonE} -> error(ReasonE) - end, - send_reply(ReplyTo, Reply), + case ReplyTo of + _ when ReplyTo =/= none -> + Reply = case delete_queue_record(Q, ActingUser, Reason0) of + ok -> + {ok, Len}; + {error, _} = Err -> + Err + end, + send_reply(ReplyTo, Reply); + none -> + %% No processes are waiting for this queue process to exit. We + %% can handle the deletion of the queue record differently: if + %% the deletion times out, we retry indefinitely. + %% + %% For instance, this allows an auto-delete queue process to + %% wait and retry until a network partition is resolved (or + %% this node stops of course). This reduces the risk of a + %% "leak" of a queue record in the metadata store. + %% + %% If for whatever reason the queue record is still leaked + %% (this process could not delete it before it was killed), the + %% "leaked" queue record will be cleaned up when the partition + %% is solved (or this node is removed from the cluster). + %% Indeed, when the partition is solved, all nodes are notified + %% with the `node_up' message from `rabbit_node_monitor'. This + %% calls `rabbit_amqqueue:on_node_up/1' which will delete any + %% transient queues. + %% + %% This infinite delete attempts loop is executed in a + %% separate process to let this queue process exits. This way, + %% connections will be notified that the queue process is + %% gone. + worker_pool:submit_async( + fun() -> + _ = infinite_internal_delete(Q, ActingUser, Reason0) + end), + ok + end, BQS1 end. +infinite_internal_delete(Q, ActingUser, Reason) -> + case delete_queue_record(Q, ActingUser, Reason) of + {error, timeout} -> + _ = rabbit_khepri:fence(infinity), + infinite_internal_delete(Q, ActingUser, Reason); + Ret -> + Ret + end. + +delete_queue_record(Q, ActingUser, Reason) -> + %% This try-catch block transforms throws to errors since throws are not + %% logged. When mnesia is removed this `try` can be removed: Khepri returns + %% errors as error tuples instead. + try + rabbit_amqqueue:internal_delete(Q, ActingUser, Reason) + catch + {error, ReasonE} -> error(ReasonE) + end. + terminated_by({terminated_by, auto_delete}) -> ?INTERNAL_USER; terminated_by({terminated_by, ActingUser}) -> diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index ed4ee2274379..b17951b8c871 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -32,7 +32,8 @@ delete/2, update/2, update_decorators/2, - exists/1 + exists/1, + foreach/2 ]). %% Once mnesia is removed, all transient entities will be deleted. These can be replaced @@ -57,8 +58,7 @@ %% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called %% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or %% HA queues are removed it can be deleted. --export([foreach_durable/2, - internal_delete/3]). +-export([internal_delete/3]). %% Storing it on Khepri is not needed, this function is just used in %% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue @@ -1263,20 +1263,26 @@ foreach_transient_in_khepri(UpdateFun) -> end. %% ------------------------------------------------------------------- -%% foreach_durable(). +%% foreach(). %% ------------------------------------------------------------------- --spec foreach_durable(UpdateFun, FilterFun) -> ok when +-spec foreach(UpdateFun, FilterFun) -> ok when UpdateFun :: fun((Queue) -> any()), FilterFun :: fun((Queue) -> boolean()). -%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'. +%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'. +%% +%% With Mnesia, only durable queues are considered because we use the durable +%% queues table. +%% +%% With Khepri, all queues are considered because they are all in the same +%% "table". %% %% @private -foreach_durable(UpdateFun, FilterFun) -> +foreach(UpdateFun, FilterFun) -> rabbit_khepri:handle_fallback( #{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end, - khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end + khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end }). foreach_durable_in_mnesia(UpdateFun, FilterFun) -> @@ -1292,11 +1298,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) -> end), ok. -foreach_durable_in_khepri(UpdateFun, FilterFun) -> - Path = khepri_queue_path( - ?KHEPRI_WILDCARD_STAR, - #if_data_matches{ - pattern = amqqueue:pattern_match_on_durable(true)}), +foreach_in_khepri(UpdateFun, FilterFun) -> + Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), case rabbit_khepri:filter(Path, fun(_, #{data := Q}) -> FilterFun(Q) end) of diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index c7423b2731ee..1e377eecf362 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) -> NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]), case Ret of ok -> - rabbit_amqqueue:forget_all_durable(NodeToRemove), + rabbit_amqqueue:forget_all(NodeToRemove), ?LOG_DEBUG( "Node ~s removed from Khepri cluster \"~s\"", [NodeToRemove, ?RA_CLUSTER_NAME], @@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) -> Ret = ra:remove_member(ServerRef, ServerId, Timeout), case Ret of {ok, _, _} -> - rabbit_amqqueue:forget_all_durable(NodeToRemove), + rabbit_amqqueue:forget_all(NodeToRemove), ?LOG_DEBUG( "Node ~s removed from Khepri cluster \"~s\"", [NodeToRemove, ?RA_CLUSTER_NAME], diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index 9dd73f68b0dd..541f98b23465 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) -> case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> rabbit_node_monitor:notify_left_cluster(Node), - rabbit_amqqueue:forget_all_durable(Node), + rabbit_amqqueue:forget_all(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 539515fd93f8..862d0f7b8e53 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -96,25 +96,36 @@ end_per_group(_, Config) -> rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> - Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - Name = rabbit_data_coercion:to_binary(Testcase), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]), - Config2 = rabbit_ct_helpers:set_config(Config1, - [{queue_name, Name}, - {alt_queue_name, <>}, - {exchange_name, Name} - ]), - rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of + {transient_queue_on_node_down, khepri} -> + {skip, "Test irrelevant with Khepri"}; + _ -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Name = rabbit_data_coercion:to_binary(Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]), + Config2 = rabbit_ct_helpers:set_config( + Config1, + [{queue_name, Name}, + {alt_queue_name, <>}, + {exchange_name, Name} + ]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()) + end. end_per_testcase(Testcase, Config) -> - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, - [?config(exchange_name, Config)]), - Config1 = rabbit_ct_helpers:run_steps( - Config, - rabbit_ct_client_helpers:teardown_steps()), - rabbit_ct_helpers:testcase_finished(Config1, Testcase). + case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of + {transient_queue_on_node_down, khepri} -> + Config; + _ -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, + [?config(exchange_name, Config)]), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase) + end. %% ------------------------------------------------------------------- %% Testcases. diff --git a/deps/rabbit/test/clustering_recovery_SUITE.erl b/deps/rabbit/test/clustering_recovery_SUITE.erl index 1167fa0a24bc..a3f5e94c9bbc 100644 --- a/deps/rabbit/test/clustering_recovery_SUITE.erl +++ b/deps/rabbit/test/clustering_recovery_SUITE.erl @@ -33,7 +33,15 @@ groups() -> {clustered_3_nodes, [], [{cluster_size_3, [], [ force_shrink_quorum_queue, - force_shrink_all_quorum_queues + force_shrink_all_quorum_queues, + autodelete_transient_queue_after_partition_recovery_1, + autodelete_durable_queue_after_partition_recovery_1, + autodelete_transient_queue_after_node_loss, + autodelete_durable_queue_after_node_loss, + exclusive_transient_queue_after_partition_recovery_1, + exclusive_durable_queue_after_partition_recovery_1, + exclusive_transient_queue_after_node_loss, + exclusive_durable_queue_after_node_loss ]} ]} ]}, @@ -43,7 +51,19 @@ groups() -> force_standalone_boot, force_standalone_boot_and_restart, force_standalone_boot_and_restart_with_quorum_queues, - recover_after_partition_with_leader + recover_after_partition_with_leader, + autodelete_transient_queue_after_partition_recovery_1, + autodelete_durable_queue_after_partition_recovery_1, + autodelete_transient_queue_after_partition_recovery_2, + autodelete_durable_queue_after_partition_recovery_2, + autodelete_transient_queue_after_node_loss, + autodelete_durable_queue_after_node_loss, + exclusive_transient_queue_after_partition_recovery_1, + exclusive_durable_queue_after_partition_recovery_1, + exclusive_transient_queue_after_partition_recovery_2, + exclusive_durable_queue_after_partition_recovery_2, + exclusive_transient_queue_after_node_loss, + exclusive_durable_queue_after_node_loss ]} ]}, {clustered_5_nodes, [], @@ -110,9 +130,51 @@ init_per_testcase(Testcase, Config) -> {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, {keep_pid_file_on_exit, true} ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + Config2 = case Testcase of + _ when Testcase =:= autodelete_transient_queue_after_partition_recovery_1 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_1 orelse + Testcase =:= autodelete_transient_queue_after_partition_recovery_2 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_1 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_1 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_2 -> + rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, + [{cluster_partition_handling, pause_minority}]}); + _ -> + Config1 + end, + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + case Config3 of + _ when is_list(Config3) andalso + (Testcase =:= autodelete_transient_queue_after_partition_recovery_2 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_2) -> + NewEnough = ok =:= rabbit_ct_broker_helpers:enable_feature_flag( + Config3, 'rabbitmq_4.2.0'), + case NewEnough of + true -> + Config3; + false -> + _ = rabbit_ct_helpers:run_steps( + Config3, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config3, Testcase), + {skip, + "The old node does not have improvements to " + "rabbit_node_monitor"} + end; + _ -> + %% Other testcases or failure to setup broker and client. + Config3 + end. end_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:run_steps(Config, @@ -255,23 +317,7 @@ force_standalone_boot_and_restart_with_quorum_queues(Config) -> recover_after_partition_with_leader(Config) -> Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - %% We use intermediate Erlang nodes between the common_test control node - %% and the RabbitMQ nodes, using `peer' standard_io communication. The goal - %% is to make sure the common_test control node doesn't interfere with the - %% nodes the RabbitMQ nodes can see, despite the blocking of the Erlang - %% distribution connection. - Proxies0 = [begin - {ok, Proxy, PeerNode} = peer:start_link( - #{name => peer:random_name(), - connection => standard_io, - wait_boot => 120000}), - ct:pal("Proxy ~0p -> ~0p", [Proxy, PeerNode]), - Proxy - end || _ <- Nodes], - Proxies = maps:from_list(lists:zip(Nodes, Proxies0)), - ct:pal("Proxies: ~p", [Proxies]), - Config1 = [{proxies, Proxies} | Config], + Config1 = start_proxies(Config), NodeA = hd(Nodes), @@ -384,6 +430,26 @@ recover_after_partition_with_leader(Config) -> application:unset_env(kernel, dist_auto_connect), ok. +start_proxies(Config) -> + %% We use intermediate Erlang nodes between the common_test control node + %% and the RabbitMQ nodes, using `peer' standard_io communication. The + %% goal is to make sure the common_test control node doesn't interfere + %% with the nodes the RabbitMQ nodes can see, despite the blocking of the + %% Erlang distribution connection. + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Proxies0 = [begin + {ok, Proxy, PeerNode} = peer:start_link( + #{name => peer:random_name(), + connection => standard_io, + wait_boot => 120000}), + ct:pal("Proxy ~0p -> ~0p", [Proxy, PeerNode]), + Proxy + end || _ <- Nodes], + Proxies = maps:from_list(lists:zip(Nodes, Proxies0)), + ct:pal("Proxies: ~p", [Proxies]), + Config1 = [{proxies, Proxies} | Config], + Config1. + proxied_rpc(Config, Node, Module, Function, Args) -> Proxies = ?config(proxies, Config), Proxy = maps:get(Node, Proxies), @@ -393,9 +459,16 @@ proxied_rpc(Config, Node, Module, Function, Args) -> get_leader_node(Config, Node) -> StoreId = rabbit_khepri:get_store_id(), - Ret = proxied_rpc( - Config, Node, - ra_leaderboard, lookup_leader, [StoreId]), + Ret = case rabbit_ct_helpers:get_config(Config, proxies) of + undefined -> + rabbit_ct_broker_helpers:rpc( + Config, Node, + ra_leaderboard, lookup_leader, [StoreId]); + _ -> + proxied_rpc( + Config, Node, + ra_leaderboard, lookup_leader, [StoreId]) + end, case Ret of {StoreId, LeaderNode} -> {ok, LeaderNode}; @@ -485,6 +558,552 @@ forget_down_node(Config) -> ok. +autodelete_transient_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = false}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +autodelete_durable_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = true}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +exclusive_transient_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = false}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +exclusive_durable_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +temporary_queue_after_partition_recovery_1(Config, QueueDeclare) -> + [_Node1, Node2 | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef = erlang:monitor(process, Conn), + + %% We create an exclusive queue on node 1 and get its PID on the server + %% side. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_], Queues), + [Queue] = Queues, + ct:pal("Queue = ~p", [Queue]), + + QName = amqqueue:get_name(Queue), + QPid = amqqueue:get_pid(Queue), + QMRef = erlang:monitor(process, QPid), + subscribe(Ch, QName#resource.name), + + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:block_traffic_between(Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + + IsAutoDeleteDurable = case QueueDeclare of + #'queue.declare'{auto_delete = true, + durable = true} -> + true; + _ -> + false + end, + case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + mnesia when not IsAutoDeleteDurable -> + clustering_utils:assert_cluster_status({Nodes, []}, [Node2]), + + %% With Mnesia, the client connection is terminated (the node is + %% stopped thanks to the pause_minority partition handling) and + %% the exclusive queue is deleted. + receive + {'DOWN', CMRef, _, _, Reason1} -> + ct:pal("Connection ~p exited: ~p", [Conn, Reason1]), + case Reason1 of + {shutdown, {server_initiated_close, _, _}} -> + ok; + {channel0_died, {shutdown, _}} -> + ok; + _ -> + ct:fail("Unexpected termination reason: ~p", [Reason1]) + end, + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn]) + end, + receive + {'DOWN', QMRef, _, _, Reason2} -> + ct:pal("Queue ~p exited: ~p", [QPid, Reason2]), + ?assertEqual(normal, Reason2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid]) + end, + + %% The queue was also deleted from the metadata store on nodes on + %% the majority side. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Majority), + + %% We can resolve the network partition. + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + %% The queue is not recorded anywhere. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + ok; + + mnesia when IsAutoDeleteDurable -> + %% An auto-delete durable queue seems to survive a network + %% partition or a node loss. Thue, there is nothing to test in the + %% scope of this test case. + ok; + + khepri -> + clustering_utils:assert_cluster_status({Nodes, [Node2]}, [Node2]), + + %% The queue is still recorded everywhere. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({ok, Queue}, Ret) + end, Nodes), + + %% Prepare a publisher. + {PConn, + PCh} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + publish_many(PCh, QName#resource.name, 10), + consume(10), + + %% We resolve the network partition. + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + publish_many(PCh, QName#resource.name, 10), + consume(10), + + rabbit_ct_client_helpers:close_connection_and_channel(PConn, PCh), + + %% We terminate the channel and connection: the queue should + %% terminate and the metadata store should have no record of it. + _ = rabbit_ct_client_helpers:close_connection_and_channel( + Conn, Ch), + + receive + {'DOWN', CMRef, _, _, Reason1} -> + ct:pal("Connection ~p exited: ~p", [Conn, Reason1]), + ?assertEqual({shutdown, normal}, Reason1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn]) + end, + receive + {'DOWN', QMRef, _, _, Reason} -> + ct:pal("Queue ~p exited: ~p", [QPid, Reason]), + ?assertEqual(normal, Reason), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid]) + end, + + %% The queue was also deleted from the metadata store on all + %% nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + ok + end. + +autodelete_transient_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = false}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +autodelete_durable_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = true}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +exclusive_transient_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +exclusive_durable_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +temporary_queue_after_partition_recovery_2(Config, QueueDeclare) -> + [_Node1, Node2 | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef1 = erlang:monitor(process, Conn1), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef2 = erlang:monitor(process, Conn2), + + %% We create an exclusive queue on node 1 and get its PID on the server + %% side. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch1, QueueDeclare)), + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch2, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_, _], Queues), + [Queue1, Queue2] = Queues, + ct:pal("Queues = ~p", [Queues]), + + QName1 = amqqueue:get_name(Queue1), + QPid1 = amqqueue:get_pid(Queue1), + QMRef1 = erlang:monitor(process, QPid1), + subscribe(Ch1, QName1#resource.name), + + QName2 = amqqueue:get_name(Queue2), + QPid2 = amqqueue:get_pid(Queue2), + QMRef2 = erlang:monitor(process, QPid2), + subscribe(Ch2, QName2#resource.name), + + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:block_traffic_between(Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + clustering_utils:assert_cluster_status({Nodes, [Node2]}, [Node2]), + + %% The queues are still recorded everywhere. + lists:foreach( + fun(Node) -> + Ret1 = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName1]), + Ret2 = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName2]), + ct:pal( + "Queues lookup on node ~0p:~n ~p~n~p", + [Node, Ret1, Ret2]), + ?assertEqual({ok, Queue1}, Ret1), + ?assertEqual({ok, Queue2}, Ret2) + end, Nodes), + + %% Publich to and consume from the queue. + ct:pal("Open connection"), + {_PConn, PCh} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + ct:pal("Publish messages to Q1"), + publish_many(PCh, QName1#resource.name, 10), + ct:pal("Publish messages to Q2"), + publish_many(PCh, QName2#resource.name, 10), + ct:pal("Consume all messages"), + consume(20), + + %% Close the first consuming client to trigger the queue deletion during + %% the network partition. Because of the network partition, the queue + %% process exits but it couldn't delete the queue record. + ct:pal("Close connection 1"), + _ = spawn(fun() -> + rabbit_ct_client_helpers:close_connection_and_channel( + Conn1, Ch1) + end), + + ct:pal("Wait for connection 1 DOWN"), + receive + {'DOWN', CMRef1, _, _, Reason1_1} -> + ct:pal("Connection ~p exited: ~p", [Conn1, Reason1_1]), + ?assertEqual({shutdown, normal}, Reason1_1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn1]) + end, + ct:pal("Wait for queue 1 DOWN"), + receive + {'DOWN', QMRef1, _, _, Reason1_2} -> + ct:pal("Queue ~p exited: ~p", [QPid1, Reason1_2]), + ?assertEqual(normal, Reason1_2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid1]) + end, + + %% We sleep to let the queue record deletion reach the timeout. It should + %% retry indefinitely. + KhepriTimeout = rabbit_ct_broker_helpers:rpc( + Config, Node2, khepri_app, get_default_timeout, []), + ct:pal("Sleep > ~b ms", [KhepriTimeout]), + timer:sleep(KhepriTimeout + 10000), + + %% The queue process exited but the queue record should still be there. The + %% temporary process is still trying to delete it but can't during the + %% network partition. + ?awaitMatch( + {ok, _}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, lookup, [QName1]), + ct:pal("Queue lookup on node ~0p: ~p", [Node2, Ret]), + Ret + end, Timeout), + + %% Close the second consuming client to trigger the queue deletion during + %% the network partition. This time, the partition is solved while the + %% queue process tries to delete the record. + ct:pal("Close connection 2"), + _ = spawn(fun() -> + rabbit_ct_client_helpers:close_connection_and_channel( + Conn2, Ch2) + end), + + ct:pal("Wait for connection 2 DOWN"), + receive + {'DOWN', CMRef2, _, _, Reason2_1} -> + ct:pal("Connection ~p exited: ~p", [Conn2, Reason2_1]), + ?assertEqual({shutdown, normal}, Reason2_1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn2]) + end, + ct:pal("Wait for queue 2 DOWN"), + receive + {'DOWN', QMRef2, _, _, Reason2_2} -> + ct:pal("Queue ~p exited: ~p", [QPid2, Reason2_2]), + ?assertEqual(normal, Reason2_2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid2]) + end, + + %% Again, the queue process exited but the queue record should still be + %% there. The temporary process is still trying to delete it but can't + %% during the network partition. + ?awaitMatch( + {ok, _}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, lookup, [QName2]), + ct:pal("Queue lookup on node ~0p: ~p", [Node2, Ret]), + Ret + end, Timeout), + + %% We resolve the network partition. + lists:foreach( + fun(Node) -> + ct:pal("Allow traffic with ~s", [Node]), + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + ct:pal("Cluster status"), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + %% The first queue was deleted from the metadata store on all nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName1]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + + %% The second queue was deleted from the metadata store on all nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName2]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + ok. + +autodelete_transient_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = false}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +autodelete_durable_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = true}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +exclusive_transient_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = false}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +exclusive_durable_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +temporary_queue_after_node_loss(Config, QueueDeclare) -> + [Node1, Node2, Node3] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + + %% We create an exclusive queue on node 1. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_], Queues), + [Queue] = Queues, + ct:pal("Queue = ~p", [Queue]), + + QName = amqqueue:get_name(Queue), + + %% We kill the node. + rabbit_ct_broker_helpers:kill_node(Config, Node2), + + ct:pal("Wait for new Khepri leader to be elected"), + lists:foreach( + fun(Node) -> + ?awaitMatch( + {ok, LeaderNode} + when LeaderNode =:= Node1 orelse LeaderNode =:= Node3, + get_leader_node(Config, Node), + Timeout) + end, Majority), + + IsAutoDeleteDurable = case QueueDeclare of + #'queue.declare'{auto_delete = true, + durable = true} -> + true; + _ -> + false + end, + case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + mnesia when not IsAutoDeleteDurable -> + clustering_utils:assert_cluster_status( + {Nodes, Majority}, Majority), + + %% The queue is already deleted from the metadata store on + %% remaining nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Majority), + ok; + + mnesia when IsAutoDeleteDurable -> + %% An auto-delete durable queue seems to survive a network + %% partition or a node loss. Thue, there is nothing to test in the + %% scope of this test case. + ok; + + khepri -> + clustering_utils:assert_cluster_status( + {Nodes, Majority}, Majority), + + %% The queue is still recorded on the remaining nodes. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({ok, Queue}, Ret) + end, Majority), + + %% We remove the lost node from the cluster. + ?assertEqual( + ok, + rabbit_ct_broker_helpers:forget_cluster_node( + Config, Node3, Node2)), + clustering_utils:assert_cluster_status( + {Majority, Majority}, Majority), + + %% The queue was deleted from the metadata store on remaining + %% nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Majority), + ok + end. + %% ------------------------------------------------------------------- %% Internal utils %% ------------------------------------------------------------------- diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index 9ee433524869..ba6245695c9f 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -500,7 +500,7 @@ foreach_durable1(_Config) -> QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>), Q1 = new_queue(QName1, rabbit_classic_queue), ?assertEqual(ok, rabbit_db_queue:set(Q1)), - ?assertEqual(ok, rabbit_db_queue:foreach_durable( + ?assertEqual(ok, rabbit_db_queue:foreach( fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), true, normal) end, @@ -566,7 +566,7 @@ internal_delete1(_Config) -> QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>), Q = new_queue(QName, rabbit_classic_queue), ?assertEqual(ok, rabbit_db_queue:set(Q)), - ?assertEqual(ok, rabbit_db_queue:foreach_durable( + ?assertEqual(ok, rabbit_db_queue:foreach( fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), false, normal) end, fun(Q0) when ?is_amqqueue(Q0) -> true end)),