diff --git a/MODULE.bazel b/MODULE.bazel index 24125e5d7ed8..eee0e09066f8 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -210,15 +210,15 @@ erlang_package.hex_package( erlang_package.hex_package( name = "khepri", build_file = "@rabbitmq-server//bazel:BUILD.khepri", - sha256 = "dccfaeb3583a04722e2258911f7f906ce67f8efac80504be4923aaafae6d4e21", - version = "0.14.0", + sha256 = "3fca316af28f0a7524be01164a3e9dd484505f18887c5c2065e0db40802522d1", + version = "0.15.0", ) erlang_package.hex_package( name = "khepri_mnesia_migration", build_file = "@rabbitmq-server//bazel:BUILD.khepri_mnesia_migration", - sha256 = "f56d277ca7876371615cef9c5674c78854f31cf9f26ce97fd3f4b5a65573ccc4", - version = "0.5.0", + sha256 = "c2426e113ca9901180cc141967ef81c0beaba2bf702ed1456360b6ec02280a71", + version = "0.6.0", ) erlang_package.hex_package( @@ -253,8 +253,8 @@ erlang_package.hex_package( name = "ra", build_file = "@rabbitmq-server//bazel:BUILD.ra", pkg = "ra", - sha256 = "0be7645dce4a76edd4c4642d0fa69639518c72b6b60a34fc86590d1909166aeb", - version = "2.13.6", + sha256 = "1d553dd971a0b398b7af0fa8c8458dda575715ff71c65c972e9500b24039b240", + version = "2.14.0", ) erlang_package.git_package( diff --git a/deps/rabbit/src/rabbit_db_maintenance.erl b/deps/rabbit/src/rabbit_db_maintenance.erl index 0a39e8db4506..46de278f1d17 100644 --- a/deps/rabbit/src/rabbit_db_maintenance.erl +++ b/deps/rabbit/src/rabbit_db_maintenance.erl @@ -155,11 +155,7 @@ get_consistent_in_mnesia(Node) -> get_consistent_in_khepri(Node) -> Path = khepri_maintenance_path(Node), - %% FIXME: Ra consistent queries are fragile in the sense that the query - %% function may run on a remote node and the function reference or MFA may - %% not be valid on that node. That's why we force a local query for now. - %Options = #{favor => consistent}, - Options = #{favor => local}, + Options = #{favor => consistency}, case rabbit_khepri:get(Path, Options) of {ok, #node_maintenance_state{status = Status}} -> Status; diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 7b8c4ee709f7..9d28760d0b19 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -264,19 +264,23 @@ setup(_) -> friendly_name => ?RA_FRIENDLY_NAME}, case khepri:start(?RA_SYSTEM, RaServerConfig) of {ok, ?STORE_ID} -> - wait_for_leader(), - wait_for_register_projections(), - ?LOG_DEBUG( - "Khepri-based " ?RA_FRIENDLY_NAME " ready", - #{domain => ?RMQLOG_DOMAIN_GLOBAL}), - ok; + RetryTimeout = retry_timeout(), + case khepri_cluster:wait_for_leader(?STORE_ID, RetryTimeout) of + ok -> + wait_for_register_projections(), + ?LOG_DEBUG( + "Khepri-based " ?RA_FRIENDLY_NAME " ready", + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + ok; + {error, timeout} -> + exit(timeout_waiting_for_leader); + {error, _} = Error -> + exit(Error) + end; {error, _} = Error -> exit(Error) end. -wait_for_leader() -> - wait_for_leader(retry_timeout(), retry_limit()). - retry_timeout() -> case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of {ok, T} -> T; @@ -289,25 +293,6 @@ retry_limit() -> undefined -> 10 end. -wait_for_leader(_Timeout, 0) -> - exit(timeout_waiting_for_leader); -wait_for_leader(Timeout, Retries) -> - rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left", - [Timeout, Retries - 1]), - Options = #{timeout => Timeout, - favor => low_latency}, - case khepri:exists(?STORE_ID, [], Options) of - Exists when is_boolean(Exists) -> - rabbit_log:info("Khepri leader elected"), - ok; - {error, timeout} -> %% Khepri >= 0.14.0 - wait_for_leader(Timeout, Retries -1); - {error, {timeout, _ServerId}} -> %% Khepri < 0.14.0 - wait_for_leader(Timeout, Retries -1); - {error, Reason} -> - throw(Reason) - end. - wait_for_register_projections() -> wait_for_register_projections(retry_timeout(), retry_limit()). @@ -940,50 +925,46 @@ cas(Path, Pattern, Data) -> ?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS). fold(Path, Pred, Acc) -> - khepri:fold(?STORE_ID, Path, Pred, Acc, #{favor => low_latency}). + khepri:fold(?STORE_ID, Path, Pred, Acc). fold(Path, Pred, Acc, Options) -> - Options1 = Options#{favor => low_latency}, - khepri:fold(?STORE_ID, Path, Pred, Acc, Options1). + khepri:fold(?STORE_ID, Path, Pred, Acc, Options). foreach(Path, Pred) -> - khepri:foreach(?STORE_ID, Path, Pred, #{favor => low_latency}). + khepri:foreach(?STORE_ID, Path, Pred). filter(Path, Pred) -> - khepri:filter(?STORE_ID, Path, Pred, #{favor => low_latency}). + khepri:filter(?STORE_ID, Path, Pred). get(Path) -> - khepri:get(?STORE_ID, Path, #{favor => low_latency}). + khepri:get(?STORE_ID, Path). get(Path, Options) -> - Options1 = Options#{favor => low_latency}, - khepri:get(?STORE_ID, Path, Options1). + khepri:get(?STORE_ID, Path, Options). get_many(PathPattern) -> - khepri:get_many(?STORE_ID, PathPattern, #{favor => low_latency}). + khepri:get_many(?STORE_ID, PathPattern). adv_get(Path) -> - khepri_adv:get(?STORE_ID, Path, #{favor => low_latency}). + khepri_adv:get(?STORE_ID, Path). adv_get_many(PathPattern) -> - khepri_adv:get_many(?STORE_ID, PathPattern, #{favor => low_latency}). + khepri_adv:get_many(?STORE_ID, PathPattern). match(Path) -> match(Path, #{}). match(Path, Options) -> - Options1 = Options#{favor => low_latency}, - khepri:get_many(?STORE_ID, Path, Options1). + khepri:get_many(?STORE_ID, Path, Options). -exists(Path) -> khepri:exists(?STORE_ID, Path, #{favor => low_latency}). +exists(Path) -> khepri:exists(?STORE_ID, Path). list(Path) -> khepri:get_many( - ?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR], #{favor => low_latency}). + ?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]). list_child_nodes(Path) -> - Options = #{props_to_return => [child_names], - favor => low_latency}, + Options = #{props_to_return => [child_names]}, case khepri_adv:get_many(?STORE_ID, Path, Options) of {ok, Result} -> case maps:values(Result) of @@ -997,8 +978,7 @@ list_child_nodes(Path) -> end. count_children(Path) -> - Options = #{props_to_return => [child_list_length], - favor => low_latency}, + Options = #{props_to_return => [child_list_length]}, case khepri_adv:get_many(?STORE_ID, Path, Options) of {ok, Map} -> lists:sum([L || #{child_list_length := L} <- maps:values(Map)]); @@ -1049,18 +1029,9 @@ transaction(Fun) -> transaction(Fun, ReadWrite) -> transaction(Fun, ReadWrite, #{}). -transaction(Fun, ReadWrite, Options0) -> - %% If the transaction is read-only, use the same default options we use - %% for most queries. - DefaultQueryOptions = case ReadWrite of - ro -> - #{favor => low_latency}; - _ -> - #{} - end, - Options1 = maps:merge(DefaultQueryOptions, Options0), - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options1), - case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options) of +transaction(Fun, ReadWrite, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of ok -> ok; {ok, Result} -> Result; {error, Reason} -> throw({error, Reason}) diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 6d88fe932b1e..86f3138ac38e 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -46,10 +46,10 @@ dep_credentials_obfuscation = hex 3.4.0 dep_cuttlefish = hex 3.4.0 dep_gen_batch_server = hex 0.8.8 dep_jose = hex 1.11.10 -dep_khepri = hex 0.14.0 -dep_khepri_mnesia_migration = hex 0.5.0 +dep_khepri = hex 0.15.0 +dep_khepri_mnesia_migration = hex 0.6.0 dep_prometheus = hex 4.11.0 -dep_ra = hex 2.13.6 +dep_ra = hex 2.14.0 dep_ranch = hex 2.1.0 dep_recon = hex 2.5.3 dep_redbug = hex 2.0.7