@@ -919,8 +919,12 @@ notify_policy_changed(#amqqueue{pid = QPid,
919919 name = QName }) when ? IS_QUORUM (QPid ) ->
920920 rabbit_quorum_queue :policy_changed (QName , QPid ).
921921
922- consumers (# amqqueue { pid = QPid }) ->
923- delegate :invoke (QPid , {gen_server2 , call , [consumers , infinity ]}).
922+ consumers (# amqqueue {pid = QPid }) when ? IS_CLASSIC (QPid ) ->
923+ delegate :invoke (QPid , {gen_server2 , call , [consumers , infinity ]});
924+ consumers (# amqqueue {pid = QPid }) when ? IS_QUORUM (QPid ) ->
925+ {ok , {_ , Result }, _ } = ra :local_query (QPid ,
926+ fun rabbit_fifo :query_consumers /1 ),
927+ maps :values (Result ).
924928
925929consumer_info_keys () -> ? CONSUMER_INFO_KEYS .
926930
@@ -1147,14 +1151,15 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP
11471151 [rabbit_misc :rs (QName ), Reason ])
11481152 end .
11491153
1150- basic_consume (# amqqueue {pid = QPid , name = QName , type = classic }, NoAck , ChPid , LimiterPid ,
1151- LimiterActive , ConsumerPrefetchCount , ConsumerTag ,
1154+ basic_consume (# amqqueue {pid = QPid , name = QName , type = classic }, NoAck , ChPid ,
1155+ LimiterPid , LimiterActive , ConsumerPrefetchCount , ConsumerTag ,
11521156 ExclusiveConsume , Args , OkMsg , ActingUser , QState ) ->
11531157 ok = check_consume_arguments (QName , Args ),
1154- case delegate :invoke (QPid , {gen_server2 , call ,
1155- [{basic_consume , NoAck , ChPid , LimiterPid , LimiterActive ,
1156- ConsumerPrefetchCount , ConsumerTag , ExclusiveConsume ,
1157- Args , OkMsg , ActingUser }, infinity ]}) of
1158+ case delegate :invoke (QPid ,
1159+ {gen_server2 , call ,
1160+ [{basic_consume , NoAck , ChPid , LimiterPid , LimiterActive ,
1161+ ConsumerPrefetchCount , ConsumerTag , ExclusiveConsume ,
1162+ Args , OkMsg , ActingUser }, infinity ]}) of
11581163 ok ->
11591164 {ok , QState };
11601165 Err ->
@@ -1164,15 +1169,17 @@ basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid,
11641169 _LimiterPid , true , _ConsumerPrefetchCount , _ConsumerTag ,
11651170 _ExclusiveConsume , _Args , _OkMsg , _ActingUser , _QStates ) ->
11661171 {error , global_qos_not_supported_for_queue_type };
1167- basic_consume (# amqqueue {pid = {Name , _ } = Id , name = QName , type = quorum } = Q , NoAck , ChPid ,
1168- _LimiterPid , _LimiterActive , ConsumerPrefetchCount , ConsumerTag ,
1169- ExclusiveConsume , Args , OkMsg , _ActingUser , QStates ) ->
1172+ basic_consume (# amqqueue {pid = {Name , _ } = Id , name = QName , type = quorum } = Q ,
1173+ NoAck , ChPid , _LimiterPid , _LimiterActive , ConsumerPrefetchCount ,
1174+ ConsumerTag , ExclusiveConsume , Args , OkMsg ,
1175+ ActingUser , QStates ) ->
11701176 ok = check_consume_arguments (QName , Args ),
11711177 QState0 = get_quorum_state (Id , QName , QStates ),
11721178 {ok , QState } = rabbit_quorum_queue :basic_consume (Q , NoAck , ChPid ,
11731179 ConsumerPrefetchCount ,
11741180 ConsumerTag ,
11751181 ExclusiveConsume , Args ,
1182+ ActingUser ,
11761183 OkMsg , QState0 ),
11771184 {ok , maps :put (Name , QState , QStates )}.
11781185
0 commit comments