Skip to content

Commit 0d0f39d

Browse files
committed
Recover bindings for all durable queues including failed to recover.
If a queue fails to recover it may still be restarted by the supervisor and eventually start. After that some bindings may be in rabbit_durable_route but not rabbit_route. This can cause binding not found errors. If bindings are recovered for failed queues, the behaviour will be the same as for the crashed queues. (which is currently broken but needs to be fixed separately) Addresses #1873 [#163919158]
1 parent b0dfe93 commit 0d0f39d

File tree

4 files changed

+21
-13
lines changed

4 files changed

+21
-13
lines changed

src/rabbit_amqqueue.erl

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,16 @@ warn_file_limit() ->
108108
ok
109109
end.
110110

111-
-spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
111+
-spec recover(rabbit_types:vhost()) ->
112+
{ClassicOk :: [amqqueue:amqqueue()],
113+
ClassicFailed :: [amqqueue:amqqueue()],
114+
Quorum :: [amqqueue:amqqueue()]}.
112115

113116
recover(VHost) ->
114117
Classic = find_local_durable_classic_queues(VHost),
115118
Quorum = find_local_quorum_queues(VHost),
116-
recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum).
119+
{ClassicOk, ClassicFailed} = recover_classic_queues(VHost, Classic),
120+
{ClassicOk, ClassicFailed, rabbit_quorum_queue:recover(Quorum)}.
117121

118122
recover_classic_queues(VHost, Queues) ->
119123
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
124128
BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
125129
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
126130
{ok, _} ->
127-
recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
131+
OkQueues = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)),
132+
OkQueuesNames = [amqqueue:get_name(Q) || Q <- OkQueues],
133+
FailedQueues = [Q || Q <- Queues,
134+
not lists:member(amqqueue:get_name(Q), OkQueuesNames)],
135+
{OkQueues, FailedQueues};
128136
{error, Reason} ->
129137
rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
130138
throw({error, Reason})
131139
end.
132140

133-
filter_per_type(Queues) ->
134-
lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
135-
136141
filter_pid_per_type(QPids) ->
137142
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
138143

@@ -156,12 +161,14 @@ stop(VHost) ->
156161
-spec start([amqqueue:amqqueue()]) -> 'ok'.
157162

158163
start(Qs) ->
159-
{Classic, _Quorum} = filter_per_type(Qs),
160164
%% At this point all recovered queues and their bindings are
161165
%% visible to routing, so now it is safe for them to complete
162166
%% their initialisation (which may involve interacting with other
163167
%% queues).
164-
_ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
168+
_ = [amqqueue:get_pid(Q) ! {self(), go}
169+
|| Q <- Qs,
170+
%% All queues are supposed to be classic here.
171+
amqqueue:is_classic(Q)],
165172
ok.
166173

167174
mark_local_durable_queues_stopped(VHost) ->

src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,7 @@ reductions(Name) ->
286286
0
287287
end.
288288

289-
-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
290-
{'absent', amqqueue:amqqueue(), atom()}].
289+
-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
291290

292291
recover(Queues) ->
293292
[begin

src/rabbit_vhost.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ recover(VHost) ->
5353
VHostStubFile = filename:join(VHostDir, ".vhost"),
5454
ok = rabbit_file:ensure_dir(VHostStubFile),
5555
ok = file:write_file(VHostStubFile, VHost),
56-
Qs = rabbit_amqqueue:recover(VHost),
56+
{ClassicOk, ClassicFailed, Quorum} = rabbit_amqqueue:recover(VHost),
57+
Qs = ClassicOk ++ ClassicFailed ++ Quorum,
5758
QNames = [amqqueue:get_name(Q) || Q <- Qs],
5859
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
59-
ok = rabbit_amqqueue:start(Qs),
60+
ok = rabbit_amqqueue:start(ClassicOk),
6061
%% Start queue mirrors.
6162
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
6263
ok.

test/backing_queue_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,8 @@ bq_queue_recover1(Config) ->
733733
after 10000 -> exit(timeout_waiting_for_queue_death)
734734
end,
735735
rabbit_amqqueue:stop(?VHOST),
736-
rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
736+
{Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
737+
rabbit_amqqueue:start(Recovered),
737738
{ok, Limiter} = rabbit_limiter:start_link(no_id),
738739
rabbit_amqqueue:with_or_die(
739740
QName,

0 commit comments

Comments
 (0)