161161 queue_cleanup_timer
162162}).
163163
164+ -define (QUEUE , lqueue ).
164165
165166-define (MAX_PERMISSION_CACHE_SIZE , 12 ).
166167
@@ -339,12 +340,29 @@ list_local() ->
339340info_keys () -> ? INFO_KEYS .
340341
341342info (Pid ) ->
342- gen_server2 :call (Pid , info , infinity ).
343+ {Timeout , Deadline } = get_operation_timeout_and_deadline (),
344+ try
345+ case gen_server2 :call (Pid , {info , Deadline }, Timeout ) of
346+ {ok , Res } -> Res ;
347+ {error , Error } -> throw (Error )
348+ end
349+ catch
350+ exit :{timeout , _ } ->
351+ rabbit_log :error (" Timed out getting channel ~p info" , [Pid ]),
352+ throw (timeout )
353+ end .
343354
344355info (Pid , Items ) ->
345- case gen_server2 :call (Pid , {info , Items }, infinity ) of
346- {ok , Res } -> Res ;
347- {error , Error } -> throw (Error )
356+ {Timeout , Deadline } = get_operation_timeout_and_deadline (),
357+ try
358+ case gen_server2 :call (Pid , {{info , Items }, Deadline }, Timeout ) of
359+ {ok , Res } -> Res ;
360+ {error , Error } -> throw (Error )
361+ end
362+ catch
363+ exit :{timeout , _ } ->
364+ rabbit_log :error (" Timed out getting channel ~p info" , [Pid ]),
365+ throw (timeout )
348366 end .
349367
350368info_all () ->
@@ -433,7 +451,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
433451 limiter = Limiter ,
434452 tx = none ,
435453 next_tag = 1 ,
436- unacked_message_q = queue :new (),
454+ unacked_message_q = ? QUEUE :new (),
437455 user = User ,
438456 virtual_host = VHost ,
439457 most_recently_declared_queue = <<>>,
@@ -493,13 +511,20 @@ prioritise_info(Msg, _Len, _State) ->
493511handle_call (flush , _From , State ) ->
494512 reply (ok , State );
495513
496- handle_call (info , _From , State ) ->
497- reply (infos (? INFO_KEYS , State ), State );
514+ handle_call ({info , Deadline }, _From , State ) ->
515+ try
516+ reply ({ok , infos (? INFO_KEYS , Deadline , State )}, State )
517+ catch
518+ Error ->
519+ reply ({error , Error }, State )
520+ end ;
498521
499- handle_call ({info , Items }, _From , State ) ->
522+ handle_call ({{ info , Items }, Deadline }, _From , State ) ->
500523 try
501- reply ({ok , infos (Items , State )}, State )
502- catch Error -> reply ({error , Error }, State )
524+ reply ({ok , infos (Items , Deadline , State )}, State )
525+ catch
526+ Error ->
527+ reply ({error , Error }, State )
503528 end ;
504529
505530handle_call (refresh_config , _From , State = # ch {virtual_host = VHost }) ->
@@ -1181,7 +1206,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11811206 DQ = {Delivery # delivery {flow = Flow }, QNames },
11821207 {noreply , case Tx of
11831208 none -> deliver_to_queues (DQ , State1 );
1184- {Msgs , Acks } -> Msgs1 = queue :in (DQ , Msgs ),
1209+ {Msgs , Acks } -> Msgs1 = ? QUEUE :in (DQ , Msgs ),
11851210 State1 # ch {tx = {Msgs1 , Acks }}
11861211 end };
11871212 {error , Reason } ->
@@ -1389,10 +1414,10 @@ handle_method(#'basic.qos'{global = true,
13891414handle_method (# 'basic.qos' {global = true ,
13901415 prefetch_count = PrefetchCount },
13911416 _ , State = # ch {limiter = Limiter , unacked_message_q = UAMQ }) ->
1392- % % TODO queue :len(UAMQ) is not strictly right since that counts
1417+ % % TODO ?QUEUE :len(UAMQ) is not strictly right since that counts
13931418 % % unacked messages from basic.get too. Pretty obscure though.
13941419 Limiter1 = rabbit_limiter :limit_prefetch (Limiter ,
1395- PrefetchCount , queue :len (UAMQ )),
1420+ PrefetchCount , ? QUEUE :len (UAMQ )),
13961421 case ((not rabbit_limiter :is_active (Limiter )) andalso
13971422 rabbit_limiter :is_active (Limiter1 )) of
13981423 true -> rabbit_amqqueue :activate_limit_all (
@@ -1405,7 +1430,7 @@ handle_method(#'basic.recover_async'{requeue = true},
14051430 _ , State = # ch {unacked_message_q = UAMQ , limiter = Limiter ,
14061431 queue_states = QueueStates0 }) ->
14071432 OkFun = fun () -> ok end ,
1408- UAMQL = queue :to_list (UAMQ ),
1433+ UAMQL = ? QUEUE :to_list (UAMQ ),
14091434 QueueStates =
14101435 foreach_per_queue (
14111436 fun ({QPid , CTag }, MsgIds , Acc0 ) ->
@@ -1419,7 +1444,7 @@ handle_method(#'basic.recover_async'{requeue = true},
14191444 ok = notify_limiter (Limiter , UAMQL ),
14201445 % % No answer required - basic.recover is the newer, synchronous
14211446 % % variant of this method
1422- {noreply , State # ch {unacked_message_q = queue :new (),
1447+ {noreply , State # ch {unacked_message_q = ? QUEUE :new (),
14231448 queue_states = QueueStates }};
14241449
14251450handle_method (# 'basic.recover_async' {requeue = false }, _ , _State ) ->
@@ -1528,7 +1553,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
15281553
15291554handle_method (# 'tx.commit' {}, _ , State = # ch {tx = {Msgs , Acks },
15301555 limiter = Limiter }) ->
1531- State1 = rabbit_misc : queue_fold (fun deliver_to_queues /2 , State , Msgs ),
1556+ State1 = queue_fold (fun deliver_to_queues /2 , State , Msgs ),
15321557 Rev = fun (X ) -> lists :reverse (lists :sort (X )) end ,
15331558 State2 = lists :foldl (fun ({ack , A }, Acc ) ->
15341559 ack (Rev (A ), Acc );
@@ -1543,7 +1568,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
15431568handle_method (# 'tx.rollback' {}, _ , State = # ch {unacked_message_q = UAMQ ,
15441569 tx = {_Msgs , Acks }}) ->
15451570 AcksL = lists :append (lists :reverse ([lists :reverse (L ) || {_ , L } <- Acks ])),
1546- UAMQ1 = queue :from_list (lists :usort (AcksL ++ queue :to_list (UAMQ ))),
1571+ UAMQ1 = ? QUEUE :from_list (lists :usort (AcksL ++ ? QUEUE :to_list (UAMQ ))),
15471572 {reply , # 'tx.rollback_ok' {}, State # ch {unacked_message_q = UAMQ1 ,
15481573 tx = new_tx ()}};
15491574
@@ -1835,28 +1860,28 @@ record_sent(Type, Tag, AckRequired,
18351860 end ,
18361861 rabbit_trace :tap_out (Msg , ConnName , ChannelNum , Username , TraceState ),
18371862 UAMQ1 = case AckRequired of
1838- true -> queue :in ({DeliveryTag , Tag , {QPid , MsgId }},
1839- UAMQ );
1863+ true -> ? QUEUE :in ({DeliveryTag , Tag , {QPid , MsgId }},
1864+ UAMQ );
18401865 false -> UAMQ
18411866 end ,
18421867 State # ch {unacked_message_q = UAMQ1 , next_tag = DeliveryTag + 1 }.
18431868
18441869% % NB: returns acks in youngest-first order
18451870collect_acks (Q , 0 , true ) ->
1846- {lists :reverse (queue :to_list (Q )), queue :new ()};
1871+ {lists :reverse (? QUEUE :to_list (Q )), ? QUEUE :new ()};
18471872collect_acks (Q , DeliveryTag , Multiple ) ->
18481873 collect_acks ([], [], Q , DeliveryTag , Multiple ).
18491874
18501875collect_acks (ToAcc , PrefixAcc , Q , DeliveryTag , Multiple ) ->
1851- case queue :out (Q ) of
1876+ case ? QUEUE :out (Q ) of
18521877 {{value , UnackedMsg = {CurrentDeliveryTag , _ConsumerTag , _Msg }},
18531878 QTail } ->
18541879 if CurrentDeliveryTag == DeliveryTag ->
18551880 {[UnackedMsg | ToAcc ],
18561881 case PrefixAcc of
18571882 [] -> QTail ;
1858- _ -> queue :join (
1859- queue :from_list (lists :reverse (PrefixAcc )),
1883+ _ -> ? QUEUE :join (
1884+ ? QUEUE :from_list (lists :reverse (PrefixAcc )),
18601885 QTail )
18611886 end };
18621887 Multiple ->
@@ -1902,7 +1927,7 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
19021927% % (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
19031928% % well as the list overall, are in "most-recent (generally youngest)
19041929% % ack first" order.
1905- new_tx () -> {queue :new (), []}.
1930+ new_tx () -> {? QUEUE :new (), []}.
19061931
19071932notify_queues (State = # ch {state = closing }) ->
19081933 {ok , State };
@@ -2134,6 +2159,17 @@ complete_tx(State = #ch{tx = failed}) ->
21342159
21352160infos (Items , State ) -> [{Item , i (Item , State )} || Item <- Items ].
21362161
2162+ infos (Items , Deadline , State ) ->
2163+ [begin
2164+ Now = now_millis (),
2165+ if
2166+ Now > Deadline ->
2167+ throw (timeout );
2168+ true ->
2169+ {Item , i (Item , State )}
2170+ end
2171+ end || Item <- Items ].
2172+
21372173i (pid , _ ) -> self ();
21382174i (connection , # ch {conn_pid = ConnPid }) -> ConnPid ;
21392175i (number , # ch {channel = Channel }) -> Channel ;
@@ -2145,8 +2181,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
21452181i (name , State ) -> name (State );
21462182i (consumer_count , # ch {consumer_mapping = CM }) -> maps :size (CM );
21472183i (messages_unconfirmed , # ch {unconfirmed = UC }) -> dtree :size (UC );
2148- i (messages_unacknowledged , # ch {unacked_message_q = UAMQ }) -> queue :len (UAMQ );
2149- i (messages_uncommitted , # ch {tx = {Msgs , _Acks }}) -> queue :len (Msgs );
2184+ i (messages_unacknowledged , # ch {unacked_message_q = UAMQ }) -> ? QUEUE :len (UAMQ );
2185+ i (messages_uncommitted , # ch {tx = {Msgs , _Acks }}) -> ? QUEUE :len (Msgs );
21502186i (messages_uncommitted , # ch {}) -> 0 ;
21512187i (acks_uncommitted , # ch {tx = {_Msgs , Acks }}) -> ack_len (Acks );
21522188i (acks_uncommitted , # ch {}) -> 0 ;
@@ -2502,3 +2538,19 @@ qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
25022538qpid_to_ref ({Name , _ }) -> Name ;
25032539% % assume it already is a ref
25042540qpid_to_ref (Ref ) -> Ref .
2541+
2542+ now_millis () ->
2543+ erlang :monotonic_time (millisecond ).
2544+
2545+ get_operation_timeout_and_deadline () ->
2546+ % NB: can't use get_operation_timeout because
2547+ % this code may not be running via the channel Pid
2548+ Timeout = ? CHANNEL_OPERATION_TIMEOUT ,
2549+ Deadline = now_millis () + Timeout ,
2550+ {Timeout , Deadline }.
2551+
2552+ queue_fold (Fun , Init , Q ) ->
2553+ case ? QUEUE :out (Q ) of
2554+ {empty , _Q } -> Init ;
2555+ {{value , V }, Q1 } -> queue_fold (Fun , Fun (V , Init ), Q1 )
2556+ end .
0 commit comments