Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions src/m2k_cluster_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,33 @@ do_sync_cluster_locked(#?MODULE{khepri_store = StoreId}) ->
[MnesiaCluster],
#{domain => ?KMM_M2K_CLUSTER_SYNC_LOG_DOMAIN}),

LargestKhepriCluster = find_largest_khepri_cluster(MnesiaCluster, StoreId),
NodesToConsider = case MnesiaCluster of
[SingleNode] ->
%% If the node is unclustered according to
%% Mnesia, we consider connected nodes that run
%% the Khepri store already.
%%
%% This allows to repair a cluster where a node
%% lost its disk for instance. In ths situation,
%% Mnesia thinks it's unclustered. Khepri on
%% other nodes will think this lost node is
%% already clustered though.
%%
%% See `find_largest_khepri_cluster/2' for the
%% rest of the logic.
PossibleNodes = list_possible_nodes(StoreId),
?LOG_DEBUG(
"Mnesia->Khepri cluster sync: "
"Connected nodes to consider: ~0p",
[PossibleNodes],
#{domain => ?KMM_M2K_CLUSTER_SYNC_LOG_DOMAIN}),
[SingleNode | PossibleNodes];
_ ->
MnesiaCluster
end,

LargestKhepriCluster = find_largest_khepri_cluster(
NodesToConsider, StoreId),
?LOG_DEBUG(
"Mnesia->Khepri cluster sync: Largest Khepri cluster: ~0p",
[LargestKhepriCluster],
Expand All @@ -133,7 +159,7 @@ do_sync_cluster_locked(#?MODULE{khepri_store = StoreId}) ->
add_nodes_to_khepri_cluster(NodesToAdd, LargestKhepriCluster, StoreId),

KhepriCluster = khepri_cluster_on_node(hd(LargestKhepriCluster), StoreId),
NodesToRemove = KhepriCluster -- MnesiaCluster,
NodesToRemove = KhepriCluster -- NodesToConsider,
?LOG_DEBUG(
"Mnesia->Khepri cluster sync: Khepri nodes being removed from the "
"expanded Khepri cluster: ~0p",
Expand All @@ -142,11 +168,24 @@ do_sync_cluster_locked(#?MODULE{khepri_store = StoreId}) ->

remove_nodes_from_khepri_cluster(NodesToRemove, StoreId).

list_possible_nodes(StoreId) ->
ConnectedNodes = nodes(),
lists:filter(
fun(Node) ->
try
erpc:call(Node, khepri_cluster, is_store_running, [StoreId])
catch
_:_ ->
false
end
end, ConnectedNodes).

find_largest_khepri_cluster(Nodes, StoreId) ->
KhepriClusters0 = list_all_khepri_clusters(Nodes, StoreId),
KhepriClusters1 = remove_khepri_nodes_not_in_mnesia_cluster(
Nodes, KhepriClusters0),
SortedKhepriClusters = sort_khepri_clusters(KhepriClusters1, StoreId),
KhepriClusters2 = discard_nodes_who_lost_their_data(KhepriClusters1),
SortedKhepriClusters = sort_khepri_clusters(KhepriClusters2, StoreId),
?LOG_DEBUG(
"Mnesia->Khepri cluster sync: Khepri clusters: ~0p",
[SortedKhepriClusters],
Expand Down Expand Up @@ -200,6 +239,57 @@ remove_khepri_nodes_not_in_mnesia_cluster1(MnesiaCluster, KhepriCluster) ->
lists:member(KhepriNode, MnesiaCluster)
end, KhepriCluster).

discard_nodes_who_lost_their_data(KhepriClusters) ->
discard_nodes_who_lost_their_data(KhepriClusters, KhepriClusters, []).

discard_nodes_who_lost_their_data(
[[SingleNode] | Rest],
KhepriClusters,
LostNodes) ->
%% We check if a standalore node is also a member of another cluster. It
%% means the standalore node lost its state and no longer knows that it is
%% already clustered. Other members consider that it is already clustered
%% and don't know the node lost its state.
%%
%% If we find such a node, we discard it from the list of Khepri clusters
%% and delete if from the other clusters. This way, the rest of the logic
%% will consider that the lost node is unclustered.
IsMemberElsewhere = lists:any(
fun
(KhepriCluster)
when length(KhepriCluster) =:= 1 ->
false;
(KhepriCluster) ->
lists:member(SingleNode, KhepriCluster)
end, KhepriClusters),
LostNodes1 = case IsMemberElsewhere of
false -> LostNodes;
true -> [SingleNode | LostNodes]
end,
discard_nodes_who_lost_their_data(Rest, KhepriClusters, LostNodes1);
discard_nodes_who_lost_their_data(
[_KhepriCluster | Rest],
KhepriClusters,
LostNodes) ->
discard_nodes_who_lost_their_data(Rest, KhepriClusters, LostNodes);
discard_nodes_who_lost_their_data([], KhepriClusters, []) ->
KhepriClusters;
discard_nodes_who_lost_their_data([], KhepriClusters, LostNodes) ->
?LOG_DEBUG(
"Mnesia->Khepri cluster sync: "
"Nodes who might have lost their data; "
"they will be considered unclustered: ~0p",
[lists:sort(LostNodes)],
#{domain => ?KMM_M2K_CLUSTER_SYNC_LOG_DOMAIN}),
lists:filtermap(
fun(KhepriCluster) ->
KhepriCluster1 = KhepriCluster -- LostNodes,
case KhepriCluster1 of
[] -> false;
_ -> {true, KhepriCluster1}
end
end, KhepriClusters).

-define(TREE_NODES_COUNTS_KEY, kmm_tree_nodes_counts).
-define(ERLANG_NODES_UPTIMES_KEY, kmm_erlang_node_uptimes).

Expand Down
19 changes: 14 additions & 5 deletions test/helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
-include_lib("common_test/include/ct.hrl").

-export([init_list_of_modules_to_skip/0,
start_erlang_node/1,
stop_erlang_node/2,
start_ra_system/1,
reset_ra_system/1,
stop_ra_system/1,
store_dir_name/1,
remove_store_dir/1,
Expand All @@ -38,10 +41,14 @@ init_list_of_modules_to_skip() ->
_ = application:load(khepri),
khepri_utils:init_list_of_modules_to_skip().

start_ra_system(RaSystem) ->
{ok, _} = application:ensure_all_started(ra),
start_ra_system(RaSystem) when is_atom(RaSystem) ->
StoreDir = store_dir_name(RaSystem),
Props = #{ra_system => RaSystem,
store_dir => StoreDir},
start_ra_system(Props);
start_ra_system(#{ra_system := RaSystem, store_dir := StoreDir} = Props) ->
_ = remove_store_dir(StoreDir),
{ok, _} = application:ensure_all_started(ra),
Default = ra_system:default_config(),
RaSystemConfig = Default#{name => RaSystem,
data_dir => StoreDir,
Expand All @@ -50,13 +57,15 @@ start_ra_system(RaSystem) ->
names => ra_system:derive_names(RaSystem)},
case ra_system:start(RaSystemConfig) of
{ok, RaSystemPid} ->
#{ra_system => RaSystem,
ra_system_pid => RaSystemPid,
store_dir => StoreDir};
Props#{ra_system_pid => RaSystemPid};
{error, _} = Error ->
throw(Error)
end.

reset_ra_system(Props) ->
stop_ra_system(Props),
start_ra_system(Props).

stop_ra_system(#{ra_system := RaSystem,
store_dir := StoreDir}) ->
?assertEqual(ok, supervisor:terminate_child(ra_systems_sup, RaSystem)),
Expand Down
96 changes: 96 additions & 0 deletions test/mnesia_to_khepri_cluster_sync_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
nodes_not_clustered_in_mnesia_are_removed_from_khepri_2/1,
no_data_loss_in_the_largest_khepri_cluster/1,
no_data_loss_in_the_khepri_cluster_having_data/1,
can_recreate_khepri_cluster_after_losing_one_node/1,
mnesia_must_run/1,
khepri_store_must_run/1,
sort_khepri_clusters_by_members_count/1,
Expand All @@ -40,6 +41,7 @@ all() ->
nodes_not_clustered_in_mnesia_are_removed_from_khepri_2,
no_data_loss_in_the_largest_khepri_cluster,
no_data_loss_in_the_khepri_cluster_having_data,
can_recreate_khepri_cluster_after_losing_one_node,
mnesia_must_run,
khepri_store_must_run,
sort_khepri_clusters_by_members_count,
Expand Down Expand Up @@ -424,6 +426,100 @@ no_data_loss_in_the_khepri_cluster_having_data(Config) ->

ok.

can_recreate_khepri_cluster_after_losing_one_node(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
Nodes = lists:sort(maps:keys(PropsPerNode)),
SomeNode = lists:nth(2, Nodes),

%% We assume all nodes are using the same Ra system name & store ID.
#{ra_system := RaSystem} = maps:get(SomeNode, PropsPerNode),
StoreId = RaSystem,

helpers:cluster_mnesia_nodes(Nodes),

lists:foreach(
fun(Node) ->
?assertEqual(
{ok, StoreId},
erpc:call(Node, khepri, start, [RaSystem, StoreId])),

?assertEqual(Nodes, helpers:mnesia_cluster_members(Node)),
?assertEqual(
[Node],
helpers:khepri_cluster_members(Node, StoreId))
end, Nodes),

?assertEqual(
ok,
erpc:call(
SomeNode, mnesia_to_khepri, sync_cluster_membership, [StoreId])),

lists:foreach(
fun(Node) ->
?assertEqual(Nodes, helpers:mnesia_cluster_members(Node)),
?assertEqual(
Nodes,
helpers:khepri_cluster_members(Node, StoreId))
end, Nodes),

Path = [foo],
Value = bar,
?assertEqual(ok, erpc:call(SomeNode, khepri, put, [StoreId, Path, Value])),

%% Add a random node to the mix.
{RandomNode, RandomPeer} = helpers:start_erlang_node("random-node"),
lists:foreach(
fun(Node) ->
?assertEqual(pong, erpc:call(RandomNode, net_adm, ping, [Node]))
end, Nodes),

%% Stop Khepri store on node 1 and reset it.
ct:pal("Stopping and \"losing\" a node"),
[LostNode | _] = Nodes,
LostNodeProps = maps:get(SomeNode, PropsPerNode),
erpc:call(LostNode, mnesia, stop, []),
erpc:call(LostNode, khepri, stop, [StoreId]),
LostNodeProps1 = erpc:call(
LostNode, helpers, reset_ra_system, [LostNodeProps]),

ct:pal("Resetting Mnesia on lost node"),
MnesiaDir = erpc:call(LostNode, mnesia, system_info, [directory]),
ok = helpers:remove_store_dir(MnesiaDir),
erpc:call(LostNode, mnesia, start, []),

ct:pal("Restarting Khepri on lost node"),
?assertEqual(
{ok, StoreId},
erpc:call(LostNode, khepri, start, [RaSystem, StoreId])),

?assertEqual(
lists:sort([node(), RandomNode | Nodes] -- [LostNode]),
lists:sort(erpc:call(LostNode, erlang, nodes, []))),

ct:pal("Sync membership on lost node"),
?assertEqual(
ok,
erpc:call(
LostNode, mnesia_to_khepri, sync_cluster_membership, [StoreId])),

ct:pal("Verifying cluster"),
lists:foreach(
fun(Node) ->
?assertEqual(
Nodes,
helpers:khepri_cluster_members(Node, StoreId))
end, Nodes),

?assertEqual(
{ok, Value},
erpc:call(LostNode, khepri, get, [StoreId, Path])),

_ = erpc:call(LostNode, helpers, stop_ra_system, [LostNodeProps1]),

helpers:stop_erlang_node(RandomNode, RandomPeer),

ok.

mnesia_must_run(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
[Node1, Node2, Node3, _, _] = Nodes = lists:sort(maps:keys(PropsPerNode)),
Expand Down