@@ -180,7 +180,7 @@ is_compatible(_, _, _) ->
180180init (Q ) when ? is_amqqueue (Q ) ->
181181 {ok , SoftLimit } = application :get_env (rabbit , quorum_commands_soft_limit ),
182182 {Name , _ } = MaybeLeader = amqqueue :get_pid (Q ),
183- Leader = case ra_leaderboard : lookup_leader ( Name ) of
183+ Leader = case find_leader ( Q ) of
184184 undefined ->
185185 % % leader from queue record will have to suffice
186186 MaybeLeader ;
@@ -1349,6 +1349,23 @@ shrink_all(Node) ->
13491349 case delete_member (Q , Node ) of
13501350 ok ->
13511351 {QName , {ok , Size - 1 }};
1352+ {error , cluster_change_not_permitted } ->
1353+ % % this could be timing related and due to a new leader just being
1354+ % % elected but it's noop command not been committed yet.
1355+ % % lets sleep and retry once
1356+ rabbit_log :info (" ~ts : failed to remove member (replica) on node ~w "
1357+ " as cluster change is not permitted. "
1358+ " retrying once in 500ms" ,
1359+ [rabbit_misc :rs (QName ), Node ]),
1360+ timer :sleep (500 ),
1361+ case delete_member (Q , Node ) of
1362+ ok ->
1363+ {QName , {ok , Size - 1 }};
1364+ {error , Err } ->
1365+ rabbit_log :warning (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1366+ [rabbit_misc :rs (QName ), Node , Err ]),
1367+ {QName , {error , Size , Err }}
1368+ end ;
13521369 {error , Err } ->
13531370 rabbit_log :warning (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
13541371 [rabbit_misc :rs (QName ), Node , Err ]),
@@ -1663,10 +1680,16 @@ open_files(Name) ->
16631680 end .
16641681
16651682leader (Q ) when ? is_amqqueue (Q ) ->
1666- {Name , Leader } = amqqueue :get_pid (Q ),
1667- case is_process_alive (Name , Leader ) of
1668- true -> Leader ;
1669- false -> ''
1683+ case find_leader (Q ) of
1684+ undefined ->
1685+ '' ;
1686+ {Name , LeaderNode } ->
1687+ case is_process_alive (Name , LeaderNode ) of
1688+ true ->
1689+ LeaderNode ;
1690+ false ->
1691+ ''
1692+ end
16701693 end .
16711694
16721695peek (Vhost , Queue , Pos ) ->
@@ -1742,12 +1765,6 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
17421765 {leader , LeaderNode },
17431766 {online , Online }].
17441767
1745- is_process_alive (Name , Node ) ->
1746- % % don't attempt rpc if node is not already connected
1747- % % as this function is used for metrics and stats and the additional
1748- % % latency isn't warranted
1749- erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
1750-
17511768- spec quorum_messages (rabbit_amqqueue :name ()) -> non_neg_integer ().
17521769
17531770quorum_messages (QName ) ->
@@ -1930,3 +1947,30 @@ wait_for_projections(Node, QName, N) ->
19301947 timer :sleep (100 ),
19311948 wait_for_projections (Node , QName , N - 1 )
19321949 end .
1950+
1951+ find_leader (Q ) when ? is_amqqueue (Q ) ->
1952+ % % the get_pid field in the queue record is updated async after a leader
1953+ % % change, so is likely to be the more stale than the leaderboard
1954+ {Name , _Node } = MaybeLeader = amqqueue :get_pid (Q ),
1955+ Leaders = case ra_leaderboard :lookup_leader (Name ) of
1956+ undefined ->
1957+ % % leader from queue record will have to suffice
1958+ [MaybeLeader ];
1959+ LikelyLeader ->
1960+ [LikelyLeader , MaybeLeader ]
1961+ end ,
1962+ Nodes = [node () | nodes ()],
1963+ case lists :search (fun ({_Nm , Nd }) ->
1964+ lists :member (Nd , Nodes )
1965+ end , Leaders ) of
1966+ {value , Leader } ->
1967+ Leader ;
1968+ false ->
1969+ undefined
1970+ end .
1971+
1972+ is_process_alive (Name , Node ) ->
1973+ % % don't attempt rpc if node is not already connected
1974+ % % as this function is used for metrics and stats and the additional
1975+ % % latency isn't warranted
1976+ erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
0 commit comments