@@ -230,18 +230,23 @@ start_cluster(Q) ->
230230 {error , {too_long , N }} ->
231231 rabbit_data_coercion :to_atom (ra :new_uid (N ))
232232 end ,
233- {Leader , Followers } = rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
234- LeaderId = {RaName , Leader },
233+ {LeaderNode , FollowerNodes } =
234+ rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
235+ LeaderId = {RaName , LeaderNode },
235236 NewQ0 = amqqueue :set_pid (Q , LeaderId ),
236- NewQ1 = amqqueue :set_type_state (NewQ0 , #{nodes => [Leader | Followers ]}),
237+ NewQ1 = amqqueue :set_type_state (NewQ0 ,
238+ #{nodes => [LeaderNode | FollowerNodes ]}),
237239
238240 rabbit_log :debug (" Will start up to ~w replicas for quorum ~ts with leader on node '~ts '" ,
239- [QuorumSize , rabbit_misc :rs (QName ), Leader ]),
241+ [QuorumSize , rabbit_misc :rs (QName ), LeaderNode ]),
240242 case rabbit_amqqueue :internal_declare (NewQ1 , false ) of
241243 {created , NewQ } ->
242244 RaConfs = [make_ra_conf (NewQ , ServerId )
243245 || ServerId <- members (NewQ )],
244- try erpc_call (Leader , ra , start_cluster ,
246+
247+ % % khepri projections on remote nodes are eventually consistent
248+ wait_for_projections (LeaderNode , QName ),
249+ try erpc_call (LeaderNode , ra , start_cluster ,
245250 [? RA_SYSTEM , RaConfs , ? START_CLUSTER_TIMEOUT ],
246251 ? START_CLUSTER_RPC_TIMEOUT ) of
247252 {ok , _ , _ } ->
@@ -266,10 +271,10 @@ start_cluster(Q) ->
266271 ActingUser }]),
267272 {new , NewQ };
268273 {error , Error } ->
269- declare_queue_error (Error , NewQ , Leader , ActingUser )
274+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
270275 catch
271276 error :Error ->
272- declare_queue_error (Error , NewQ , Leader , ActingUser )
277+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
273278 end ;
274279 {existing , _ } = Ex ->
275280 Ex
@@ -359,26 +364,28 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
359364 end .
360365
361366become_leader (QName , Name ) ->
367+ % % as this function is called synchronously when a ra node becomes leader
368+ % % we need to ensure there is no chance of blocking as else the ra node
369+ % % may not be able to establish its leadership
370+ spawn (fun () -> become_leader0 (QName , Name ) end ).
371+
372+ become_leader0 (QName , Name ) ->
362373 Fun = fun (Q1 ) ->
363374 amqqueue :set_state (
364375 amqqueue :set_pid (Q1 , {Name , node ()}),
365376 live )
366377 end ,
367- % % as this function is called synchronously when a ra node becomes leader
368- % % we need to ensure there is no chance of blocking as else the ra node
369- % % may not be able to establish its leadership
370- spawn (fun () ->
371- _ = rabbit_amqqueue :update (QName , Fun ),
372- case rabbit_amqqueue :lookup (QName ) of
373- {ok , Q0 } when ? is_amqqueue (Q0 ) ->
374- Nodes = get_nodes (Q0 ),
375- [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
376- [QName ], ? RPC_TIMEOUT )
377- || Node <- Nodes , Node =/= node ()];
378- _ ->
379- ok
380- end
381- end ).
378+ _ = rabbit_amqqueue :update (QName , Fun ),
379+ case rabbit_amqqueue :lookup (QName ) of
380+ {ok , Q0 } when ? is_amqqueue (Q0 ) ->
381+ Nodes = get_nodes (Q0 ),
382+ _ = [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
383+ [QName ], ? RPC_TIMEOUT )
384+ || Node <- Nodes , Node =/= node ()],
385+ ok ;
386+ _ ->
387+ ok
388+ end .
382389
383390-spec all_replica_states () -> {node (), #{atom () => atom ()}}.
384391all_replica_states () ->
@@ -550,7 +557,7 @@ handle_tick(QName,
550557 catch
551558 _ :Err ->
552559 rabbit_log :debug (" ~ts : handle tick failed with ~p " ,
553- [rabbit_misc :rs (QName ), Err ]),
560+ [rabbit_misc :rs (QName ), Err ]),
554561 ok
555562 end
556563 end ).
@@ -566,7 +573,7 @@ repair_leader_record(QName, Self) ->
566573 rabbit_log :debug (" ~ts : repairing leader record" ,
567574 [rabbit_misc :rs (QName )]),
568575 {_ , Name } = erlang :process_info (Self , registered_name ),
569- become_leader (QName , Name ),
576+ ok = become_leader0 (QName , Name ),
570577 ok
571578 end ,
572579 ok .
@@ -633,7 +640,7 @@ recover(_Vhost, Queues) ->
633640 Err1 == name_not_registered ->
634641 rabbit_log :warning (" Quorum queue recovery: configured member of ~ts was not found on this node. Starting member as a new one. "
635642 " Context: ~s " ,
636- [rabbit_misc :rs (QName ), Err1 ]),
643+ [rabbit_misc :rs (QName ), Err1 ]),
637644 % queue was never started on this node
638645 % so needs to be started from scratch.
639646 case start_server (make_ra_conf (Q0 , ServerId )) of
@@ -1901,3 +1908,23 @@ force_all_queues_shrink_member_to_current_member() ->
19011908is_minority (All , Up ) ->
19021909 MinQuorum = length (All ) div 2 + 1 ,
19031910 length (Up ) < MinQuorum .
1911+
1912+ wait_for_projections (Node , QName ) ->
1913+ case rabbit_feature_flags :is_enabled (khepri_db ) andalso
1914+ Node =/= node () of
1915+ true ->
1916+ wait_for_projections (Node , QName , 256 );
1917+ false ->
1918+ ok
1919+ end .
1920+
1921+ wait_for_projections (Node , QName , 0 ) ->
1922+ exit ({wait_for_projections_timed_out , Node , QName });
1923+ wait_for_projections (Node , QName , N ) ->
1924+ case erpc_call (Node , rabbit_amqqueue , lookup , [QName ], 100 ) of
1925+ {ok , _ } ->
1926+ ok ;
1927+ _ ->
1928+ timer :sleep (100 ),
1929+ wait_for_projections (Node , QName , N - 1 )
1930+ end .
0 commit comments