diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 3bb8f53f3bf..c4a6681101e 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -9,7 +9,7 @@ -export([recover/1, stop/1, start/1, declare/6, declare/7, delete_immediately/1, delete_exclusive/2, delete/4, purge/1, - forget_all_durable/1]). + forget_all/1]). -export([pseudo_queue/2, pseudo_queue/3]). -export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1, not_found_or_absent_dirty/1, @@ -1890,19 +1890,19 @@ internal_delete(Queue, ActingUser, Reason) -> {user_who_performed_action, ActingUser}]) end. --spec forget_all_durable(node()) -> 'ok'. +-spec forget_all(node()) -> 'ok'. -%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running` -%% Does it make any sense once mnesia is not used/removed? -forget_all_durable(Node) -> - ?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]), +%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and +%% `rabbit_khepri:remove_*_member/1'. +forget_all(Node) -> + ?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]), UpdateFun = fun(Q) -> forget_node_for_queue(Q) end, FilterFun = fun(Q) -> is_local_to_node(amqqueue:get_pid(Q), Node) end, - rabbit_db_queue:foreach_durable(UpdateFun, FilterFun). + rabbit_db_queue:foreach(UpdateFun, FilterFun). forget_node_for_queue(Q) when ?amqqueue_is_quorum(Q) -> @@ -1950,21 +1950,17 @@ on_node_up(_Node) -> -spec on_node_down(node()) -> 'ok'. on_node_down(Node) -> - case delete_transient_queues_on_node(Node) of - ok -> + case rabbit_khepri:is_enabled() of + true -> + %% With Khepri, we don't delete transient/exclusive queues. There + %% may be a network partition and the node will be reachable again + %% after the partition is repaired. + %% + %% If the node will never come back, it will likely be removed from + %% the cluster. We take care of transient queues at that time. ok; - {error, timeout} -> - %% This case is possible when running Khepri. The node going down - %% could leave the cluster in a minority so the command to delete - %% the transient queue records would fail. Also see - %% `rabbit_khepri:init/0': we also try this deletion when the node - %% restarts - a time that the cluster is very likely to have a - %% majority - to ensure these records are deleted. - ?LOG_WARNING("transient queues for node '~ts' could not be " - "deleted because of a timeout. These queues " - "will be removed when node '~ts' restarts or " - "is removed from the cluster.", [Node, Node]), - ok + false -> + ok = delete_transient_queues_on_node(Node) end. -spec delete_transient_queues_on_node(Node) -> Ret when diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 4cc90e39ac4..2a4731fdcf8 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -32,7 +32,8 @@ delete/2, update/2, update_decorators/2, - exists/1 + exists/1, + foreach/2 ]). %% Once mnesia is removed, all transient entities will be deleted. These can be replaced @@ -57,8 +58,7 @@ %% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called %% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or %% HA queues are removed it can be deleted. --export([foreach_durable/2, - internal_delete/3]). +-export([internal_delete/3]). %% Storing it on Khepri is not needed, this function is just used in %% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue @@ -1250,20 +1250,26 @@ foreach_transient_in_khepri(UpdateFun) -> end. %% ------------------------------------------------------------------- -%% foreach_durable(). +%% foreach(). %% ------------------------------------------------------------------- --spec foreach_durable(UpdateFun, FilterFun) -> ok when +-spec foreach(UpdateFun, FilterFun) -> ok when UpdateFun :: fun((Queue) -> any()), FilterFun :: fun((Queue) -> boolean()). -%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'. +%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'. +%% +%% With Mnesia, only durable queues are considered because we use the durable +%% queues table. +%% +%% With Khepri, all queues are considered because they are all in the same +%% "table". %% %% @private -foreach_durable(UpdateFun, FilterFun) -> +foreach(UpdateFun, FilterFun) -> rabbit_khepri:handle_fallback( #{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end, - khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end + khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end }). foreach_durable_in_mnesia(UpdateFun, FilterFun) -> @@ -1279,11 +1285,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) -> end), ok. -foreach_durable_in_khepri(UpdateFun, FilterFun) -> - Path = khepri_queue_path( - ?KHEPRI_WILDCARD_STAR, - #if_data_matches{ - pattern = amqqueue:pattern_match_on_durable(true)}), +foreach_in_khepri(UpdateFun, FilterFun) -> + Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), case rabbit_khepri:filter(Path, fun(_, #{data := Q}) -> FilterFun(Q) end) of diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 9357f231801..67ded48dbee 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) -> NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]), case Ret of ok -> - rabbit_amqqueue:forget_all_durable(NodeToRemove), + rabbit_amqqueue:forget_all(NodeToRemove), ?LOG_DEBUG( "Node ~s removed from Khepri cluster \"~s\"", [NodeToRemove, ?RA_CLUSTER_NAME], @@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) -> Ret = ra:remove_member(ServerRef, ServerId, Timeout), case Ret of {ok, _, _} -> - rabbit_amqqueue:forget_all_durable(NodeToRemove), + rabbit_amqqueue:forget_all(NodeToRemove), ?LOG_DEBUG( "Node ~s removed from Khepri cluster \"~s\"", [NodeToRemove, ?RA_CLUSTER_NAME], diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index 1299293b5b2..f9ea0833ce5 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) -> case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> rabbit_node_monitor:notify_left_cluster(Node), - rabbit_amqqueue:forget_all_durable(Node), + rabbit_amqqueue:forget_all(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 539515fd93f..862d0f7b8e5 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -96,25 +96,36 @@ end_per_group(_, Config) -> rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> - Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - Name = rabbit_data_coercion:to_binary(Testcase), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]), - Config2 = rabbit_ct_helpers:set_config(Config1, - [{queue_name, Name}, - {alt_queue_name, <>}, - {exchange_name, Name} - ]), - rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of + {transient_queue_on_node_down, khepri} -> + {skip, "Test irrelevant with Khepri"}; + _ -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Name = rabbit_data_coercion:to_binary(Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]), + Config2 = rabbit_ct_helpers:set_config( + Config1, + [{queue_name, Name}, + {alt_queue_name, <>}, + {exchange_name, Name} + ]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()) + end. end_per_testcase(Testcase, Config) -> - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, - [?config(exchange_name, Config)]), - Config1 = rabbit_ct_helpers:run_steps( - Config, - rabbit_ct_client_helpers:teardown_steps()), - rabbit_ct_helpers:testcase_finished(Config1, Testcase). + case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of + {transient_queue_on_node_down, khepri} -> + Config; + _ -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, + [?config(exchange_name, Config)]), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase) + end. %% ------------------------------------------------------------------- %% Testcases. diff --git a/deps/rabbit/test/clustering_recovery_SUITE.erl b/deps/rabbit/test/clustering_recovery_SUITE.erl index 1167fa0a24b..503bcc36b3b 100644 --- a/deps/rabbit/test/clustering_recovery_SUITE.erl +++ b/deps/rabbit/test/clustering_recovery_SUITE.erl @@ -33,7 +33,11 @@ groups() -> {clustered_3_nodes, [], [{cluster_size_3, [], [ force_shrink_quorum_queue, - force_shrink_all_quorum_queues + force_shrink_all_quorum_queues, + autodelete_queue_after_partition_recovery, + autodelete_queue_after_node_loss, + exclusive_queue_after_partition_recovery, + exclusive_queue_after_node_loss ]} ]} ]}, @@ -43,7 +47,11 @@ groups() -> force_standalone_boot, force_standalone_boot_and_restart, force_standalone_boot_and_restart_with_quorum_queues, - recover_after_partition_with_leader + recover_after_partition_with_leader, + autodelete_queue_after_partition_recovery, + autodelete_queue_after_node_loss, + exclusive_queue_after_partition_recovery, + exclusive_queue_after_node_loss ]} ]}, {clustered_5_nodes, [], @@ -110,7 +118,17 @@ init_per_testcase(Testcase, Config) -> {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, {keep_pid_file_on_exit, true} ]), - rabbit_ct_helpers:run_steps(Config1, + Config2 = case Testcase of + _ when Testcase =:= autodelete_queue_after_partition_recovery orelse + Testcase =:= exclusive_queue_after_partition_recovery -> + rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, + [{cluster_partition_handling, pause_minority}]}); + _ -> + Config1 + end, + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). @@ -485,6 +503,235 @@ forget_down_node(Config) -> ok. +autodelete_queue_after_partition_recovery(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true}, + temporary_queue_after_partition_recovery(Config, QueueDeclare). + +exclusive_queue_after_partition_recovery(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true}, + temporary_queue_after_partition_recovery(Config, QueueDeclare). + +temporary_queue_after_partition_recovery(Config, QueueDeclare) -> + [_Node1, Node2, _Node3] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef = erlang:monitor(process, Conn), + + %% We create an exclusive queue on node 1 and get its PID on the server + %% side. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_], Queues), + [Queue] = Queues, + ct:pal("Queue = ~p", [Queue]), + + QName = amqqueue:get_name(Queue), + QPid = amqqueue:get_pid(Queue), + QMRef = erlang:monitor(process, QPid), + subscribe(Ch, QName#resource.name), + + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:block_traffic_between(Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + + case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + mnesia -> + clustering_utils:assert_cluster_status({Nodes, []}, [Node2]), + + %% With Mnesia, the client connection is terminated (the node is + %% stopped thanks to the pause_minority partition handling) and + %% the exclusive queue is deleted. + receive + {'DOWN', CMRef, _, _, Reason1} -> + ct:pal("Connection ~p exited: ~p", [Conn, Reason1]), + ?assertMatch( + {shutdown, {server_initiated_close, _, _}}, + Reason1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn]) + end, + receive + {'DOWN', QMRef, _, _, Reason2} -> + ct:pal("Queue ~p exited: ~p", [QPid, Reason2]), + ?assertEqual(normal, Reason2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid]) + end, + + %% The queue was also deleted from the metadata store on nodes on + %% the majority side. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({error, not_found}, Ret) + end, Majority), + + %% We can resolve the network partition. + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + %% The queue is not recorded anywhere. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({error, not_found}, Ret) + end, Nodes), + ok; + + khepri -> + clustering_utils:assert_cluster_status({Nodes, [Node2]}, [Node2]), + + %% The queue is still recorded everywhere. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({ok, Queue}, Ret) + end, Nodes), + + %% Prepare a publisher. + {PConn, PCh} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + publish_many(PCh, QName#resource.name, 10), + consume(10), + + %% We resolve the network partition. + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + publish_many(PCh, QName#resource.name, 10), + consume(10), + + rabbit_ct_client_helpers:close_connection_and_channel(PConn, PCh), + + %% We terminate the channel and connection: the queue should + %% terminate and the metadata store should have no record of it. + _ = rabbit_ct_client_helpers:close_connection_and_channel( + Conn, Ch), + + receive + {'DOWN', CMRef, _, _, Reason1} -> + ct:pal("Connection ~p exited: ~p", [Conn, Reason1]), + ?assertEqual({shutdown, normal}, Reason1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn]) + end, + receive + {'DOWN', QMRef, _, _, Reason} -> + ct:pal("Queue ~p exited: ~p", [QPid, Reason]), + ?assertEqual(normal, Reason), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid]) + end, + + %% The queue was also deleted from the metadata store on all + %% nodes. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({error, not_found}, Ret) + end, Nodes), + ok + end. + +autodelete_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +exclusive_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +temporary_queue_after_node_loss(Config, QueueDeclare) -> + [_Node1, Node2, Node3] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + + %% We create an exclusive queue on node 1. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_], Queues), + [Queue] = Queues, + ct:pal("Queue = ~p", [Queue]), + + QName = amqqueue:get_name(Queue), + + %% We kill the node. + rabbit_ct_broker_helpers:kill_node(Config, Node2), + + case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + mnesia -> + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + + %% The queue is already deleted from the metadata store on + %% remaining nodes. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({error, not_found}, Ret) + end, Majority), + ok; + + khepri -> + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + + %% The queue is still recorded on the remaining nodes. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({ok, Queue}, Ret) + end, Majority), + + %% We remove the lost node from the cluster. + rabbit_ct_broker_helpers:forget_cluster_node(Config, Node3, Node2), + clustering_utils:assert_cluster_status({Majority, Majority}, Majority), + + %% The queue was deleted from the metadata store on remaining + %% nodes. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({error, not_found}, Ret) + end, Majority), + ok + end. + %% ------------------------------------------------------------------- %% Internal utils %% ------------------------------------------------------------------- diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index c80b1fcfba8..9683d06812b 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -496,7 +496,7 @@ foreach_durable1(_Config) -> QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>), Q1 = new_queue(QName1, rabbit_classic_queue), ?assertEqual(ok, rabbit_db_queue:set(Q1)), - ?assertEqual(ok, rabbit_db_queue:foreach_durable( + ?assertEqual(ok, rabbit_db_queue:foreach( fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), true, normal) end, @@ -562,7 +562,7 @@ internal_delete1(_Config) -> QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>), Q = new_queue(QName, rabbit_classic_queue), ?assertEqual(ok, rabbit_db_queue:set(Q)), - ?assertEqual(ok, rabbit_db_queue:foreach_durable( + ?assertEqual(ok, rabbit_db_queue:foreach( fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), false, normal) end, fun(Q0) when ?is_amqqueue(Q0) -> true end)),