2929-export ([not_found /1 , absent /2 ]).
3030-export ([list /0 , list /1 , info_keys /0 , info /1 , info /2 , info_all /1 , info_all /2 ,
3131 emit_info_all /5 , list_local /1 , info_local /1 ,
32- emit_info_local /4 , emit_info_down /4 ]).
32+ emit_info_local /4 , emit_info_down /4 ]).
3333-export ([list_down /1 , count /1 , list_names /0 , list_names /1 , list_local_names /0 ,
3434 list_with_possible_retry /1 ]).
3535-export ([list_by_type /1 ]).
8181-type msg_id () :: non_neg_integer ().
8282-type ok_or_errors () ::
8383 'ok' | {'error' , [{'error' | 'exit' | 'throw' , any ()}]}.
84- -type absent_reason () :: 'nodedown' | 'crashed' .
85- -type queue_or_absent () :: amqqueue : amqqueue () |
86- {'absent' , amqqueue :amqqueue (),absent_reason ()}.
87- -type not_found_or_absent () ::
88- 'not_found' | { 'absent' , amqqueue : amqqueue (), absent_reason ()}.
84+ -type absent_reason () :: 'nodedown' | 'crashed' | stopped | timeout .
85+ -type queue_not_found () :: not_found .
86+ - type queue_absent () :: {'absent' , amqqueue :amqqueue (), absent_reason ()}.
87+ -type not_found_or_absent () :: queue_not_found () | queue_absent ().
88+ - type quorum_states () :: #{ Name :: atom () => rabbit_fifo_client : state ()}.
8989
9090% %----------------------------------------------------------------------------
9191
@@ -231,13 +231,14 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
231231 [Pid , Error ]) || {Pid , Error } <- Failures ],
232232 [Q || {_ , {new , Q }} <- Results ].
233233
234- - spec declare
235- (name (), boolean (), boolean (), rabbit_framing :amqp_table (),
236- rabbit_types :maybe (pid ()), rabbit_types :username ()) ->
237- {'new' | 'existing' | 'absent' | 'owner_died' ,
238- amqqueue :amqqueue ()} |
239- {'new' , amqqueue :amqqueue (), rabbit_fifo_client :state ()} |
240- rabbit_types :channel_exit ().
234+ - spec declare (name (),
235+ boolean (),
236+ boolean (),
237+ rabbit_framing :amqp_table (),
238+ rabbit_types :maybe (pid ()),
239+ rabbit_types :username ()) ->
240+ {'new' | 'existing' | 'absent' | 'owner_died' , amqqueue :amqqueue ()} |
241+ rabbit_types :channel_exit ().
241242
242243declare (QueueName , Durable , AutoDelete , Args , Owner , ActingUser ) ->
243244 declare (QueueName , Durable , AutoDelete , Args , Owner , ActingUser , node ()).
@@ -247,13 +248,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
247248% % should be. Note that in some cases (e.g. with "nodes" policy in
248249% % effect) this might not be possible to satisfy.
249250
250- - spec declare
251- (name (), boolean (), boolean (), rabbit_framing :amqp_table (),
252- rabbit_types :maybe (pid ()), rabbit_types :username (), node ()) ->
253- {'new' | 'existing' | 'owner_died' , amqqueue :amqqueue ()} |
254- {'new' , amqqueue :amqqueue (), rabbit_fifo_client :state ()} |
255- {'absent' , amqqueue :amqqueue (), absent_reason ()} |
256- rabbit_types :channel_exit ().
251+ - spec declare (name (),
252+ boolean (),
253+ boolean (),
254+ rabbit_framing :amqp_table (),
255+ rabbit_types :maybe (pid ()),
256+ rabbit_types :username (),
257+ node ()) ->
258+ {'new' | 'existing' | 'owner_died' , amqqueue :amqqueue ()} |
259+ {'new' , amqqueue :amqqueue (), rabbit_fifo_client :state ()} |
260+ {'absent' , amqqueue :amqqueue (), absent_reason ()} |
261+ rabbit_types :channel_exit ().
257262
258263declare (QueueName = # resource {virtual_host = VHost }, Durable , AutoDelete , Args ,
259264 Owner , ActingUser , Node ) ->
@@ -317,7 +322,7 @@ get_queue_type(Args) ->
317322 end .
318323
319324- spec internal_declare (amqqueue :amqqueue (), boolean ()) ->
320- queue_or_absent () | rabbit_misc : thunk ( queue_or_absent () ).
325+ { created | existing , amqqueue : amqqueue ()} | queue_absent ( ).
321326
322327internal_declare (Q , Recover ) ->
323328 ? try_mnesia_tx_or_upgrade_amqqueue_and_retry (
@@ -451,6 +456,8 @@ not_found_or_absent(Name) ->
451456 [Q ] -> {absent , Q , nodedown } % % Q exists on stopped node
452457 end .
453458
459+ - spec not_found_or_absent_dirty (name ()) -> not_found_or_absent ().
460+
454461not_found_or_absent_dirty (Name ) ->
455462 % % We should read from both tables inside a tx, to get a
456463 % % consistent view. But the chances of an inconsistency are small,
@@ -460,12 +467,15 @@ not_found_or_absent_dirty(Name) ->
460467 {ok , Q } -> {absent , Q , nodedown }
461468 end .
462469
463- - spec with (name (), qfun (A ), fun ((not_found_or_absent ()) -> B )) -> A | B .
470+ - spec with (name (),
471+ qfun (A ),
472+ fun ((not_found_or_absent ()) -> rabbit_types :channel_exit ())) ->
473+ A | rabbit_types :channel_exit ().
464474
465475with (Name , F , E ) ->
466476 with (Name , F , E , 2000 ).
467477
468- with (Name , F , E , RetriesLeft ) ->
478+ with (# resource {} = Name , F , E , RetriesLeft ) ->
469479 case lookup (Name ) of
470480 {ok , Q } when ? amqqueue_state_is (Q , live ) andalso RetriesLeft =:= 0 ->
471481 % % Something bad happened to that queue, we are bailing out
@@ -502,6 +512,12 @@ with(Name, F, E, RetriesLeft) ->
502512 E (not_found_or_absent_dirty (Name ))
503513 end .
504514
515+ - spec retry_wait (amqqueue :amqqueue (),
516+ qfun (A ),
517+ fun ((not_found_or_absent ()) -> rabbit_types :channel_exit ()),
518+ non_neg_integer ()) ->
519+ A | rabbit_types :channel_exit ().
520+
505521retry_wait (Q , F , E , RetriesLeft ) ->
506522 Name = amqqueue :get_name (Q ),
507523 QPid = amqqueue :get_pid (Q ),
@@ -535,23 +551,32 @@ with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
535551- spec with_or_die (name (), qfun (A )) -> A | rabbit_types :channel_exit ().
536552
537553with_or_die (Name , F ) ->
538- with (Name , F , fun (not_found ) -> not_found (Name );
539- ({absent , Q , Reason }) -> absent (Q , Reason )
540- end ).
554+ with (Name , F , die_fun (Name )).
555+
556+ - spec die_fun (name ()) ->
557+ fun ((not_found_or_absent ()) -> rabbit_types :channel_exit ()).
558+
559+ die_fun (Name ) ->
560+ fun (not_found ) -> not_found (Name );
561+ ({absent , Q , Reason }) -> absent (Q , Reason )
562+ end .
541563
542- - spec not_found (rabbit_types : r ( atom () )) -> rabbit_types :channel_exit ().
564+ - spec not_found (name ( )) -> rabbit_types :channel_exit ().
543565
544566not_found (R ) -> rabbit_misc :protocol_error (not_found , " no ~s " , [rabbit_misc :rs (R )]).
545567
546- - spec absent (amqqueue :amqqueue (), rabbit_amqqueue : absent_reason ()) ->
547- rabbit_types :channel_exit ().
568+ - spec absent (amqqueue :amqqueue (), absent_reason ()) ->
569+ rabbit_types :channel_exit ().
548570
549571absent (Q , AbsentReason ) ->
550572 QueueName = amqqueue :get_name (Q ),
551573 QPid = amqqueue :get_pid (Q ),
552574 IsDurable = amqqueue :is_durable (Q ),
553575 priv_absent (QueueName , QPid , IsDurable , AbsentReason ).
554576
577+ - spec priv_absent (name (), pid (), boolean (), absent_reason ()) ->
578+ rabbit_types :channel_exit ().
579+
555580priv_absent (QueueName , QPid , true , nodedown ) ->
556581 % % The assertion of durability is mainly there because we mention
557582 % % durability in the error message. That way we will hopefully
@@ -1006,7 +1031,7 @@ notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) ->
10061031
10071032- spec consumers (amqqueue :amqqueue ()) ->
10081033 [{pid (), rabbit_types :ctag (), boolean (), non_neg_integer (),
1009- rabbit_framing :amqp_table ()}].
1034+ rabbit_framing :amqp_table (), rabbit_types : username () }].
10101035
10111036consumers (Q ) when ? amqqueue_is_classic (Q ) ->
10121037 QPid = amqqueue :get_pid (Q ),
@@ -1189,7 +1214,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) ->
11891214 NodeId = amqqueue :get_pid (Q ),
11901215 rabbit_quorum_queue :purge (NodeId ).
11911216
1192- - spec requeue (pid (), [msg_id ()], pid (), #{Name :: atom () => rabbit_fifo_client :state ()}) -> 'ok' .
1217+ - spec requeue (pid (),
1218+ {rabbit_fifo :consumer_tag (), [msg_id ()]},
1219+ pid (),
1220+ quorum_states ()) ->
1221+ 'ok' .
11931222
11941223requeue (QPid , {_ , MsgIds }, ChPid , QuorumStates ) when ? IS_CLASSIC (QPid ) ->
11951224 ok = delegate :invoke (QPid , {gen_server2 , call , [{requeue , MsgIds , ChPid }, infinity ]}),
@@ -1205,7 +1234,11 @@ requeue({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
12051234 QuorumStates
12061235 end .
12071236
1208- - spec ack (pid (), [msg_id ()], pid (), #{Name :: atom () => rabbit_fifo_client :state ()}) -> 'ok' .
1237+ - spec ack (pid (),
1238+ {rabbit_fifo :consumer_tag (), [msg_id ()]},
1239+ pid (),
1240+ quorum_states ()) ->
1241+ quorum_states ().
12091242
12101243ack (QPid , {_ , MsgIds }, ChPid , QueueStates ) when ? IS_CLASSIC (QPid ) ->
12111244 delegate :invoke_no_result (QPid , {gen_server2 , cast , [{ack , MsgIds , ChPid }]}),
@@ -1221,8 +1254,12 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
12211254 QuorumStates
12221255 end .
12231256
1224- - spec reject (pid () | {atom (), node ()}, [msg_id ()], boolean (), pid (),
1225- #{Name :: atom () => rabbit_fifo_client :state ()}) -> 'ok' .
1257+ - spec reject (pid () | amqqueue :ra_server_id (),
1258+ boolean (),
1259+ {rabbit_fifo :consumer_tag (), [msg_id ()]},
1260+ pid (),
1261+ quorum_states ()) ->
1262+ quorum_states ().
12261263
12271264reject (QPid , Requeue , {_ , MsgIds }, ChPid , QStates ) when ? IS_CLASSIC (QPid ) ->
12281265 ok = delegate :invoke_no_result (QPid , {gen_server2 , cast ,
@@ -1265,17 +1302,20 @@ notify_down_all(QPids, ChPid, Timeout) ->
12651302 Error -> {error , Error }
12661303 end .
12671304
1268- - spec activate_limit_all (qpids (), pid ()) -> ok_or_errors () .
1305+ - spec activate_limit_all (qpids (), pid ()) -> ok .
12691306
12701307activate_limit_all (QRefs , ChPid ) ->
12711308 QPids = [P || P <- QRefs , ? IS_CLASSIC (P )],
12721309 delegate :invoke_no_result (QPids , {gen_server2 , cast ,
12731310 [{activate_limit , ChPid }]}).
12741311
1275- - spec credit
1276- (amqqueue :amqqueue (), pid (), rabbit_types :ctag (), non_neg_integer (),
1277- boolean (), #{Name :: atom () => rabbit_fifo_client :state ()}) ->
1278- 'ok' .
1312+ - spec credit (amqqueue :amqqueue (),
1313+ pid (),
1314+ rabbit_types :ctag (),
1315+ non_neg_integer (),
1316+ boolean (),
1317+ quorum_states ()) ->
1318+ {'ok' , quorum_states ()}.
12791319
12801320credit (Q , ChPid , CTag , Credit ,
12811321 Drain , QStates ) when ? amqqueue_is_classic (Q ) ->
@@ -1294,7 +1334,9 @@ credit(Q,
12941334
12951335- spec basic_get (amqqueue :amqqueue (), pid (), boolean (), pid (), rabbit_types :ctag (),
12961336 #{Name :: atom () => rabbit_fifo_client :state ()}) ->
1297- {'ok' , non_neg_integer (), qmsg ()} | 'empty' .
1337+ {'ok' , non_neg_integer (), qmsg (), quorum_states ()} |
1338+ {'empty' , quorum_states ()} |
1339+ rabbit_types :channel_exit ().
12981340
12991341basic_get (Q , ChPid , NoAck , LimiterPid , _CTag , _ )
13001342 when ? amqqueue_is_classic (Q ) ->
@@ -1421,10 +1463,7 @@ internal_delete1(QueueName, OnlyDurable, Reason) ->
14211463 % % after the transaction.
14221464 rabbit_binding :remove_for_destination (QueueName , OnlyDurable ).
14231465
1424- - spec internal_delete (name (), rabbit_types :username ()) ->
1425- 'ok' | rabbit_types :connection_exit () |
1426- fun (() ->
1427- 'ok' | rabbit_types :connection_exit ()).
1466+ - spec internal_delete (name (), rabbit_types :username ()) -> 'ok' .
14281467
14291468internal_delete (QueueName , ActingUser ) ->
14301469 internal_delete (QueueName , ActingUser , normal ).
@@ -1682,7 +1721,9 @@ pseudo_queue(QueueName, Pid) ->
16821721
16831722- spec pseudo_queue (name (), pid (), boolean ()) -> amqqueue :amqqueue ().
16841723
1685- pseudo_queue (QueueName , Pid , Durable ) ->
1724+ pseudo_queue (# resource {kind = queue } = QueueName , Pid , Durable )
1725+ when is_pid (Pid ) andalso
1726+ is_boolean (Durable ) ->
16861727 amqqueue :new (QueueName ,
16871728 Pid ,
16881729 Durable ,
@@ -1704,8 +1745,12 @@ deliver(Qs, Delivery) ->
17041745 deliver (Qs , Delivery , untracked ),
17051746 ok .
17061747
1707- - spec deliver ([amqqueue :amqqueue ()], rabbit_types :delivery (), #{Name :: atom () => rabbit_fifo_client :state ()} | 'untracked' ) ->
1708- {qpids (), #{Name :: atom () => rabbit_fifo_client :state ()}}.
1748+ - spec deliver ([amqqueue :amqqueue ()],
1749+ rabbit_types :delivery (),
1750+ quorum_states () | 'untracked' ) ->
1751+ {qpids (),
1752+ [{amqqueue :ra_server_id (), name ()}],
1753+ quorum_states ()}.
17091754
17101755deliver ([], _Delivery , QueueState ) ->
17111756 % % /dev/null optimisation
0 commit comments