3939-export ([open_files /1 ]).
4040-export ([peek /2 , peek /3 ]).
4141-export ([add_member /2 ,
42- add_member /4 ]).
42+ add_member /3 ,
43+ add_member /4 ,
44+ add_member /5 ]).
4345-export ([delete_member /3 , delete_member /2 ]).
4446-export ([requeue /3 ]).
4547-export ([policy_changed /1 ]).
4648-export ([format_ra_event /3 ]).
4749-export ([cleanup_data_dir /0 ]).
4850-export ([shrink_all /1 ,
49- grow /4 ]).
51+ grow /4 ,
52+ grow /5 ]).
5053-export ([transfer_leadership /2 , get_replicas /1 , queue_length /1 ]).
5154-export ([file_handle_leader_reservation /1 ,
5255 file_handle_other_reservation /0 ]).
5356-export ([file_handle_release_reservation /0 ]).
5457-export ([list_with_minimum_quorum /0 ,
5558 filter_quorum_critical /1 ,
56- filter_quorum_critical /2 ,
59+ filter_quorum_critical /3 ,
5760 all_replica_states /0 ]).
5861-export ([capabilities /0 ]).
5962-export ([repair_amqqueue_nodes /1 ,
8487-type msg_id () :: non_neg_integer ().
8588-type qmsg () :: {rabbit_types :r ('queue' ), pid (), msg_id (), boolean (),
8689 mc :state ()}.
90+ -type membership () :: voter | non_voter | promotable . % % see ra_membership() in Ra.
8791
8892-define (RA_SYSTEM , quorum_queues ).
8993-define (RA_WAL_NAME , ra_log_wal ).
@@ -384,13 +388,15 @@ become_leader(QName, Name) ->
384388all_replica_states () ->
385389 Rows0 = ets :tab2list (ra_state ),
386390 Rows = lists :map (fun
387- % % TODO: support other membership types
391+ ({K , follower , promotable }) ->
392+ {K , promotable };
393+ ({K , follower , non_voter }) ->
394+ {K , non_voter };
388395 ({K , S , voter }) ->
389396 {K , S };
390397 (T ) ->
391398 T
392399 end , Rows0 ),
393-
394400 {node (), maps :from_list (Rows )}.
395401
396402- spec list_with_minimum_quorum () -> [amqqueue :amqqueue ()].
@@ -419,20 +425,22 @@ filter_quorum_critical(Queues) ->
419425 ReplicaStates = maps :from_list (
420426 rabbit_misc :append_rpc_all_nodes (rabbit_nodes :list_running (),
421427 ? MODULE , all_replica_states , [])),
422- filter_quorum_critical (Queues , ReplicaStates ).
428+ filter_quorum_critical (Queues , ReplicaStates , node () ).
423429
424- - spec filter_quorum_critical ([amqqueue :amqqueue ()], #{node () => #{atom () => atom ()}}) -> [amqqueue :amqqueue ()].
430+ - spec filter_quorum_critical ([amqqueue :amqqueue ()], #{node () => #{atom () => atom ()}}, node ()) ->
431+ [amqqueue :amqqueue ()].
425432
426- filter_quorum_critical (Queues , ReplicaStates ) ->
433+ filter_quorum_critical (Queues , ReplicaStates , Self ) ->
427434 lists :filter (fun (Q ) ->
428435 MemberNodes = rabbit_amqqueue :get_quorum_nodes (Q ),
429436 {Name , _Node } = amqqueue :get_pid (Q ),
430437 AllUp = lists :filter (fun (N ) ->
431- {Name , _ } = amqqueue :get_pid (Q ),
432438 case maps :get (N , ReplicaStates , undefined ) of
433439 #{Name := State }
434440 when State =:= follower orelse
435- State =:= leader ->
441+ State =:= leader orelse
442+ (State =:= promotable andalso N =:= Self ) orelse
443+ (State =:= non_voter andalso N =:= Self ) ->
436444 true ;
437445 _ -> false
438446 end
@@ -1143,7 +1151,7 @@ get_sys_status(Proc) ->
11431151
11441152 end .
11451153
1146- add_member (VHost , Name , Node , Timeout ) ->
1154+ add_member (VHost , Name , Node , Membership , Timeout ) when is_binary ( VHost ) ->
11471155 QName = # resource {virtual_host = VHost , name = Name , kind = queue },
11481156 rabbit_log :debug (" Asked to add a replica for queue ~ts on node ~ts " , [rabbit_misc :rs (QName ), Node ]),
11491157 case rabbit_amqqueue :lookup (QName ) of
@@ -1161,7 +1169,7 @@ add_member(VHost, Name, Node, Timeout) ->
11611169 rabbit_log :debug (" Quorum ~ts already has a replica on node ~ts " , [rabbit_misc :rs (QName ), Node ]),
11621170 ok ;
11631171 false ->
1164- add_member (Q , Node , Timeout )
1172+ add_member (Q , Node , Membership , Timeout )
11651173 end
11661174 end ;
11671175 {ok , _Q } ->
@@ -1171,9 +1179,15 @@ add_member(VHost, Name, Node, Timeout) ->
11711179 end .
11721180
11731181add_member (Q , Node ) ->
1174- add_member (Q , Node , ? ADD_MEMBER_TIMEOUT ).
1182+ add_member (Q , Node , promotable ).
1183+
1184+ add_member (Q , Node , Membership ) ->
1185+ add_member (Q , Node , Membership , ? ADD_MEMBER_TIMEOUT ).
11751186
1176- add_member (Q , Node , Timeout ) when ? amqqueue_is_quorum (Q ) ->
1187+ add_member (VHost , Name , Node , Timeout ) when is_binary (VHost ) ->
1188+ % % NOTE needed to pass mixed cluster tests.
1189+ add_member (VHost , Name , Node , promotable , Timeout );
1190+ add_member (Q , Node , Membership , Timeout ) when ? amqqueue_is_quorum (Q ) ->
11771191 {RaName , _ } = amqqueue :get_pid (Q ),
11781192 QName = amqqueue :get_name (Q ),
11791193 % % TODO parallel calls might crash this, or add a duplicate in quorum_nodes
@@ -1183,7 +1197,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
11831197 ? TICK_TIMEOUT ),
11841198 SnapshotInterval = application :get_env (rabbit , quorum_snapshot_interval ,
11851199 ? SNAPSHOT_INTERVAL ),
1186- Conf = make_ra_conf (Q , ServerId , TickTimeout , SnapshotInterval , voter ),
1200+ Conf = make_ra_conf (Q , ServerId , TickTimeout , SnapshotInterval , Membership ),
11871201 case ra :start_server (? RA_SYSTEM , Conf ) of
11881202 ok ->
11891203 ServerIdSpec = maps :with ([id , uid , membership ], Conf ),
@@ -1295,17 +1309,21 @@ shrink_all(Node) ->
12951309 amqqueue :get_type (Q ) == ? MODULE ,
12961310 lists :member (Node , get_nodes (Q ))].
12971311
1298- - spec grow (node (), binary (), binary (), all | even ) ->
1312+
1313+ grow (Node , VhostSpec , QueueSpec , Strategy ) ->
1314+ grow (Node , VhostSpec , QueueSpec , Strategy , promotable ).
1315+
1316+ - spec grow (node (), binary (), binary (), all | even , membership ()) ->
12991317 [{rabbit_amqqueue :name (),
13001318 {ok , pos_integer ()} | {error , pos_integer (), term ()}}].
1301- grow (Node , VhostSpec , QueueSpec , Strategy ) ->
1319+ grow (Node , VhostSpec , QueueSpec , Strategy , Membership ) ->
13021320 Running = rabbit_nodes :list_running (),
13031321 [begin
13041322 Size = length (get_nodes (Q )),
13051323 QName = amqqueue :get_name (Q ),
13061324 rabbit_log :info (" ~ts : adding a new member (replica) on node ~w " ,
13071325 [rabbit_misc :rs (QName ), Node ]),
1308- case add_member (Q , Node , ? ADD_MEMBER_TIMEOUT ) of
1326+ case add_member (Q , Node , Membership ) of
13091327 ok ->
13101328 {QName , {ok , Size + 1 }};
13111329 {error , Err } ->
0 commit comments