@@ -980,23 +980,30 @@ delete_transient_in_khepri(FilterFun) ->
980980 [? KHEPRI_WILDCARD_STAR ,
981981 # if_data_matches {
982982 pattern = amqqueue :pattern_match_on_durable (false )}],
983- Ret = rabbit_khepri :fold (
984- PathPattern ,
985- fun (Path , #{data := Queue }, Acc ) ->
986- case FilterFun (Queue ) of
987- true ->
988- QueueName = khepri_queue_path_to_name (Path ),
989- case delete_in_khepri (QueueName , false ) of
990- ok -> Acc ;
991- Deletions -> [{QueueName , Deletions } | Acc ]
992- end ;
993- false ->
994- Acc
995- end
996- end , [],
997- #{copy_tree_and_run_from_caller => true }),
998- case Ret of
999- {ok , Items } ->
983+ % % The `FilterFun' might try to determine if the queue's process is alive.
984+ % % This can cause a `calling_self' exception if we use the `FilterFun'
985+ % % within the function passed to `khepri:fold/5' since the Khepri server
986+ % % process might call itself. Instead we can fetch all of the transient
987+ % % queues with `get_many' and then filter and fold the results outside of
988+ % % Khepri's Ra server process.
989+ case rabbit_khepri :get_many (PathPattern ) of
990+ {ok , Qs } ->
991+ Items = maps :fold (
992+ fun (Path , #{data := Queue }, Acc ) ->
993+ case FilterFun (Queue ) of
994+ true ->
995+ QueueName = khepri_queue_path_to_name (
996+ Path ),
997+ case delete_in_khepri (QueueName , false ) of
998+ ok ->
999+ Acc ;
1000+ Deletions ->
1001+ [{QueueName , Deletions } | Acc ]
1002+ end ;
1003+ false ->
1004+ Acc
1005+ end
1006+ end , [], Qs ),
10001007 {QueueNames , Deletions } = lists :unzip (Items ),
10011008 {QueueNames , lists :flatten (Deletions )};
10021009 {error , _ } = Error ->
0 commit comments