Skip to content

Commit cb0429e

Browse files
Merge pull request #1787 from rabbitmq/dialyze-qq
Fix dialyzer warnings
2 parents e0ad5b0 + f10083b commit cb0429e

File tree

4 files changed

+79
-14
lines changed

4 files changed

+79
-14
lines changed

src/rabbit_amqqueue.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
-export([list_local_followers/0]).
4545
-export([ensure_rabbit_queue_record_is_initialized/1]).
4646
-export([format/1]).
47+
-export([delete_immediately_by_resource/1]).
4748

4849
-export([pid_of/1, pid_of/2]).
4950
-export([mark_local_durable_queues_stopped/1]).
@@ -266,6 +267,13 @@ filter_per_type(Queues) ->
266267
filter_pid_per_type(QPids) ->
267268
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
268269

270+
filter_resource_per_type(Resources) ->
271+
Queues = [begin
272+
{ok, #amqqueue{pid = QPid}} = lookup(Resource),
273+
{Resource, QPid}
274+
end || Resource <- Resources],
275+
lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
276+
269277
stop(VHost) ->
270278
%% Classic queues
271279
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -957,7 +965,16 @@ delete_exclusive(QPids, ConnId) ->
957965
delete_immediately(QPids) ->
958966
{Classic, Quorum} = filter_pid_per_type(QPids),
959967
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
960-
[rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum],
968+
case Quorum of
969+
[] -> ok;
970+
_ -> {error, cannot_delete_quorum_queues, Quorum}
971+
end.
972+
973+
delete_immediately_by_resource(Resources) ->
974+
{Classic, Quorum} = filter_resource_per_type(Resources),
975+
[gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
976+
[rabbit_quorum_queue:delete_immediately(Resource, QPid)
977+
|| {Resource, QPid} <- Quorum],
961978
ok.
962979

963980
delete(#amqqueue{ type = quorum} = Q,

src/rabbit_quorum_queue.erl

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
-module(rabbit_quorum_queue).
1818

1919
-export([init_state/2, handle_event/2]).
20-
-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]).
20+
-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
2121
-export([info/1, info/2, stat/1, infos/1]).
2222
-export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
2323
-export([credit/4]).
@@ -75,7 +75,7 @@
7575
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
7676
-spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
7777
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
78-
-spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
78+
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
7979

8080
-define(STATISTICS_KEYS,
8181
[policy,
@@ -94,15 +94,17 @@
9494
%%----------------------------------------------------------------------------
9595

9696
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
97-
rabbit_fifo_client:state().
97+
rabbit_fifo_client:state().
9898
init_state({Name, _}, QName) ->
9999
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
100+
%% This lookup could potentially return an {error, not_found}, but we do not
101+
%% know what to do if the queue has `disappeared`. Let it crash.
100102
{ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
101103
rabbit_amqqueue:lookup(QName),
102104
%% Ensure the leader is listed first
103105
Servers0 = [{Name, N} || N <- Nodes],
104106
Servers = [Leader | lists:delete(Leader, Servers0)],
105-
rabbit_fifo_client:init(QName, Servers, SoftLimit,
107+
rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
106108
fun() -> credit_flow:block(Name), ok end,
107109
fun() -> credit_flow:unblock(Name), ok end).
108110

@@ -305,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
305307
end
306308
end.
307309

308-
delete_immediately({Name, _} = QPid) ->
309-
QName = queue_name(Name),
310-
_ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER),
311-
ok = ra:delete_cluster([QPid]),
312-
rabbit_core_metrics:queue_deleted(QName),
310+
delete_immediately(Resource, {_Name, _} = QPid) ->
311+
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
312+
{ok, _} = ra:delete_cluster([QPid]),
313+
rabbit_core_metrics:queue_deleted(Resource),
313314
ok.
314315

315316
ack(CTag, MsgIds, QState) ->

src/rabbit_vhost.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,16 @@ delete_storage(VHost) ->
208208
VhostDir = msg_store_dir_path(VHost),
209209
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
210210
%% Message store should be closed when vhost supervisor is closed.
211-
ok = rabbit_file:recursive_delete([VhostDir]).
211+
case rabbit_file:recursive_delete([VhostDir]) of
212+
ok -> ok;
213+
{error, {_, enoent}} ->
214+
%% a concurrent delete did the job for us
215+
rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]),
216+
ok;
217+
Other ->
218+
rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]),
219+
Other
220+
end.
212221

213222
assert_benign(ok, _) -> ok;
214223
assert_benign({ok, _}, _) -> ok;

test/quorum_queue_SUITE.erl

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ all_tests() ->
116116
basic_recover,
117117
idempotent_recover,
118118
vhost_with_quorum_queue_is_deleted,
119+
delete_immediately,
120+
delete_immediately_by_resource,
119121
consume_redelivery_count,
120122
subscribe_redelivery_count
121123
].
@@ -1914,6 +1916,42 @@ basic_recover(Config) ->
19141916
wait_for_messages_ready(Servers, RaName, 1),
19151917
wait_for_messages_pending_ack(Servers, RaName, 0).
19161918

1919+
delete_immediately(Config) ->
1920+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1921+
1922+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1923+
QQ = ?config(queue_name, Config),
1924+
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
1925+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1926+
declare(Ch, QQ, Args)),
1927+
1928+
Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
1929+
{ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
1930+
?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
1931+
1932+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1933+
amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
1934+
durable = true,
1935+
passive = true,
1936+
auto_delete = false,
1937+
arguments = Args})).
1938+
1939+
delete_immediately_by_resource(Config) ->
1940+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1941+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1942+
QQ = ?config(queue_name, Config),
1943+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1944+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1945+
Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."],
1946+
?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)),
1947+
1948+
%% Check that the application and process are down
1949+
wait_until(fun() ->
1950+
[] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
1951+
end),
1952+
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
1953+
rpc:call(Server, application, which_applications, []))).
1954+
19171955
subscribe_redelivery_count(Config) ->
19181956
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
19191957

@@ -1967,7 +2005,6 @@ consume_redelivery_count(Config) ->
19672005
QQ = ?config(queue_name, Config),
19682006
?assertEqual({'queue.declare_ok', QQ, 0, 0},
19692007
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1970-
19712008
RaName = ra_name(QQ),
19722009
publish(Ch, QQ),
19732010
wait_for_messages_ready(Servers, RaName, 1),
@@ -1984,7 +2021,9 @@ consume_redelivery_count(Config) ->
19842021
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
19852022
multiple = false,
19862023
requeue = true}),
1987-
2024+
%% wait for requeueing
2025+
timer:sleep(500),
2026+
19882027
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
19892028
redelivered = true},
19902029
#amqp_msg{props = #'P_basic'{headers = H1}}} =
@@ -2005,7 +2044,6 @@ consume_redelivery_count(Config) ->
20052044
multiple = false,
20062045
requeue = true}).
20072046

2008-
20092047
%%----------------------------------------------------------------------------
20102048

20112049
declare(Ch, Q) ->

0 commit comments

Comments
 (0)