Skip to content

Commit dd66a56

Browse files
committed
Keep exclusive queues with Khepri + network partition
[Why] With Mnesia, when the network partition strategy is set to `pause_minority`, nodes on the "minority side" are stopped. Thus, the exclusive queues that were hosted by nodes on that minority side are lost: * Consumers connected on these nodes are disconnected because the nodes are stopped. * Queue records on the majority side are deleted from the metadata store. This was ok with Mnesia and how this network partition handling strategy is implemented. However, it does not work with Khepri because the nodes on the "minority side" continue to run and serve clients. Therefore the cluster ends up in a weird situation: 1. The "majority side" deleted the queue records. 2. When the network partition is solved, the "minority side" gets the record deletion, but the queue processes continue to run. [How] With Khepri, we stop to delete transient queue records in general, just because there is a node going down. Thanks to this, an exclusive queue and its consumer are not affected by a network partition: they continue to work. However, if a node is really lost, we need to clean up dead queue records. This was already done for durable queues with both Mnesia and Khepri. But with Khepri, transient queue records persist in the store like durable queue records (unlike with Mnesia). That's why this commit changes the clean-up function, `rabbit_amqqueue:forget_all_durable/1` into `rabbit_amqqueue:forget_all/1` which deletes all queue records of queues that were hosted on the given node, regardless if they are transient or durable. Fixes #12949, #12597.
1 parent b3a58b8 commit dd66a56

File tree

5 files changed

+280
-40
lines changed

5 files changed

+280
-40
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-export([recover/1, stop/1, start/1, declare/6, declare/7,
1111
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
12-
forget_all_durable/1]).
12+
forget_all/1]).
1313
-export([pseudo_queue/2, pseudo_queue/3]).
1414
-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
1515
not_found_or_absent_dirty/1,
@@ -1890,19 +1890,19 @@ internal_delete(Queue, ActingUser, Reason) ->
18901890
{user_who_performed_action, ActingUser}])
18911891
end.
18921892

1893-
-spec forget_all_durable(node()) -> 'ok'.
1893+
-spec forget_all(node()) -> 'ok'.
18941894

1895-
%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running`
1896-
%% Does it make any sense once mnesia is not used/removed?
1897-
forget_all_durable(Node) ->
1898-
?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
1895+
%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and
1896+
%% `rabbit_khepri:remove_*_member/1'.
1897+
forget_all(Node) ->
1898+
?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
18991899
UpdateFun = fun(Q) ->
19001900
forget_node_for_queue(Q)
19011901
end,
19021902
FilterFun = fun(Q) ->
19031903
is_local_to_node(amqqueue:get_pid(Q), Node)
19041904
end,
1905-
rabbit_db_queue:foreach_durable(UpdateFun, FilterFun).
1905+
rabbit_db_queue:foreach(UpdateFun, FilterFun).
19061906

19071907
forget_node_for_queue(Q)
19081908
when ?amqqueue_is_quorum(Q) ->
@@ -1950,21 +1950,17 @@ on_node_up(_Node) ->
19501950
-spec on_node_down(node()) -> 'ok'.
19511951

19521952
on_node_down(Node) ->
1953-
case delete_transient_queues_on_node(Node) of
1954-
ok ->
1953+
case rabbit_khepri:is_enabled() of
1954+
true ->
1955+
%% With Khepri, we don't delete transient/exclusive queues. There
1956+
%% may be a network partition and the node will be reachable again
1957+
%% after the partition is repaired.
1958+
%%
1959+
%% If the node will never come back, it will likely be removed from
1960+
%% the cluster. We take care of transient queues at that time.
19551961
ok;
1956-
{error, timeout} ->
1957-
%% This case is possible when running Khepri. The node going down
1958-
%% could leave the cluster in a minority so the command to delete
1959-
%% the transient queue records would fail. Also see
1960-
%% `rabbit_khepri:init/0': we also try this deletion when the node
1961-
%% restarts - a time that the cluster is very likely to have a
1962-
%% majority - to ensure these records are deleted.
1963-
?LOG_WARNING("transient queues for node '~ts' could not be "
1964-
"deleted because of a timeout. These queues "
1965-
"will be removed when node '~ts' restarts or "
1966-
"is removed from the cluster.", [Node, Node]),
1967-
ok
1962+
false ->
1963+
ok = delete_transient_queues_on_node(Node)
19681964
end.
19691965

19701966
-spec delete_transient_queues_on_node(Node) -> Ret when

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
delete/2,
3333
update/2,
3434
update_decorators/2,
35-
exists/1
35+
exists/1,
36+
foreach/2
3637
]).
3738

3839
%% Once mnesia is removed, all transient entities will be deleted. These can be replaced
@@ -57,8 +58,7 @@
5758
%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
5859
%% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or
5960
%% HA queues are removed it can be deleted.
60-
-export([foreach_durable/2,
61-
internal_delete/3]).
61+
-export([internal_delete/3]).
6262

6363
%% Storing it on Khepri is not needed, this function is just used in
6464
%% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue
@@ -1250,20 +1250,26 @@ foreach_transient_in_khepri(UpdateFun) ->
12501250
end.
12511251

12521252
%% -------------------------------------------------------------------
1253-
%% foreach_durable().
1253+
%% foreach().
12541254
%% -------------------------------------------------------------------
12551255

1256-
-spec foreach_durable(UpdateFun, FilterFun) -> ok when
1256+
-spec foreach(UpdateFun, FilterFun) -> ok when
12571257
UpdateFun :: fun((Queue) -> any()),
12581258
FilterFun :: fun((Queue) -> boolean()).
1259-
%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'.
1259+
%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'.
1260+
%%
1261+
%% With Mnesia, only durable queues are considered because we use the durable
1262+
%% queues table.
1263+
%%
1264+
%% With Khepri, all queues are considered because they are all in the same
1265+
%% "table".
12601266
%%
12611267
%% @private
12621268

1263-
foreach_durable(UpdateFun, FilterFun) ->
1269+
foreach(UpdateFun, FilterFun) ->
12641270
rabbit_khepri:handle_fallback(
12651271
#{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end,
1266-
khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end
1272+
khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end
12671273
}).
12681274

12691275
foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
@@ -1279,11 +1285,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
12791285
end),
12801286
ok.
12811287

1282-
foreach_durable_in_khepri(UpdateFun, FilterFun) ->
1283-
Path = khepri_queue_path(
1284-
?KHEPRI_WILDCARD_STAR,
1285-
#if_data_matches{
1286-
pattern = amqqueue:pattern_match_on_durable(true)}),
1288+
foreach_in_khepri(UpdateFun, FilterFun) ->
1289+
Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
12871290
case rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
12881291
FilterFun(Q)
12891292
end) of

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) ->
670670
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
671671
case Ret of
672672
ok ->
673-
rabbit_amqqueue:forget_all_durable(NodeToRemove),
673+
rabbit_amqqueue:forget_all(NodeToRemove),
674674
?LOG_DEBUG(
675675
"Node ~s removed from Khepri cluster \"~s\"",
676676
[NodeToRemove, ?RA_CLUSTER_NAME],
@@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) ->
692692
Ret = ra:remove_member(ServerRef, ServerId, Timeout),
693693
case Ret of
694694
{ok, _, _} ->
695-
rabbit_amqqueue:forget_all_durable(NodeToRemove),
695+
rabbit_amqqueue:forget_all(NodeToRemove),
696696
?LOG_DEBUG(
697697
"Node ~s removed from Khepri cluster \"~s\"",
698698
[NodeToRemove, ?RA_CLUSTER_NAME],

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) ->
916916
case mnesia:del_table_copy(schema, Node) of
917917
{atomic, ok} ->
918918
rabbit_node_monitor:notify_left_cluster(Node),
919-
rabbit_amqqueue:forget_all_durable(Node),
919+
rabbit_amqqueue:forget_all(Node),
920920
ok;
921921
{aborted, Reason} ->
922922
{error, {failed_to_remove_node, Node, Reason}}

0 commit comments

Comments
 (0)