Skip to content

Commit 4139d63

Browse files
committed
Periodically re-evaluate max_age retention
This implements periodic re-evaluation of max_age retention specificions so that streams that have little activity but more than one segments will evantually be truncated.
1 parent c423e2c commit 4139d63

File tree

2 files changed

+94
-4
lines changed

2 files changed

+94
-4
lines changed

src/osiris_retention.erl

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ handle_call(_Request, _From, State) ->
6565
%% @spec handle_cast(Msg, State) -> {noreply, State} |
6666
%% {noreply, State, Timeout} |
6767
%% {stop, Reason, State}
68-
handle_cast({eval, Name, Dir, Specs, Fun}, State) ->
68+
handle_cast({eval, _Name, Dir, Specs, Fun} = Eval, State) ->
69+
ct:pal("EVAL ~p", [Eval]),
6970
Result = osiris_log:evaluate_retention(Dir, Specs),
7071
_ = Fun(Result),
71-
{noreply, schedule(osiris_util:normalise_name(Name), Specs, Result, State)}.
72+
{noreply, schedule(Eval, Result, State)}.
7273

7374
%% @spec handle_info(Info, State) -> {noreply, State} |
7475
%% {noreply, State, Timeout} |
@@ -87,6 +88,25 @@ code_change(_OldVsn, State, _Extra) ->
8788
%%%===================================================================
8889
%%% Internal functions
8990
%%%===================================================================
90-
schedule(_Name, _Specs, _Result, State) ->
91-
State.
91+
schedule({eval, Name, _Dir, Specs, _Fun} = Eval, _Result,
92+
#state{scheduled = Scheduled0} = State) ->
93+
%% we need to check the scheduled map even if the current specs do not
94+
%% include max_age as the retention config could have changed
95+
Scheduled = case maps:take(Name, Scheduled0) of
96+
{OldRef, Scheduled1} ->
97+
_ = erlang:cancel_timer(OldRef),
98+
Scheduled1;
99+
error ->
100+
Scheduled0
101+
end,
102+
case lists:any(fun ({T, _}) -> T == max_age end, Specs) of
103+
true ->
104+
%% schedule a new eval
105+
EvalInterval = application:get_env(osiris, retention_eval_interval,
106+
?DEFAULT_SHEDULED_EVAL_TIME),
107+
Ref = erlang:send_after(EvalInterval, self(), {'$gen_cast', Eval}),
108+
State#state{scheduled = Scheduled#{Name => Ref}};
109+
false ->
110+
State#state{scheduled = Scheduled}
111+
end.
92112

test/osiris_SUITE.erl

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ all_tests() ->
5353
replica_unknown_command,
5454
diverged_replica,
5555
retention,
56+
retention_max_age_eventually,
57+
retention_max_age_update_retention,
5658
retention_add_replica_after,
5759
retention_overtakes_offset_reader,
5860
update_retention,
@@ -1033,6 +1035,7 @@ retention(Config) ->
10331035
Num = 150000,
10341036
Name = ?config(cluster_name, Config),
10351037
SegSize = 50000 * 1000,
1038+
%% silly low setting
10361039
Conf0 =
10371040
#{name => Name,
10381041
epoch => 1,
@@ -1051,6 +1054,58 @@ retention(Config) ->
10511054
osiris:stop_cluster(Conf1),
10521055
ok.
10531056

1057+
retention_max_age_eventually(Config) ->
1058+
DataDir = ?config(data_dir, Config),
1059+
Num = 150000,
1060+
Name = ?config(cluster_name, Config),
1061+
SegSize = 50000 * 1000,
1062+
application:set_env(osiris, retention_eval_interval, 5000),
1063+
Conf0 =
1064+
#{name => Name,
1065+
epoch => 1,
1066+
leader_node => node(),
1067+
retention => [{max_age, 5000}],
1068+
max_segment_size_bytes => SegSize,
1069+
replica_nodes => []},
1070+
{ok, #{leader_pid := Leader, replica_pids := []} = Conf1} =
1071+
osiris:start_cluster(Conf0),
1072+
timer:sleep(100),
1073+
write_n(Leader, Num, 0, 1000 * 8, #{}),
1074+
Wc = filename:join([DataDir, ?FUNCTION_NAME, "*.segment"]),
1075+
%% one file only
1076+
await_condition(fun () ->
1077+
length(filelib:wildcard(Wc)) == 1
1078+
end, 1000, 20),
1079+
osiris:stop_cluster(Conf1),
1080+
ok.
1081+
1082+
retention_max_age_update_retention(Config) ->
1083+
%% ensure a retention update cancels the current scheduled
1084+
%% retention evaluation
1085+
DataDir = ?config(data_dir, Config),
1086+
Num = 150000,
1087+
Name = ?config(cluster_name, Config),
1088+
SegSize = 50000 * 1000,
1089+
application:set_env(osiris, retention_eval_interval, 5000),
1090+
Conf0 =
1091+
#{name => Name,
1092+
epoch => 1,
1093+
leader_node => node(),
1094+
retention => [{max_age, 5000}],
1095+
max_segment_size_bytes => SegSize,
1096+
replica_nodes => []},
1097+
{ok, #{leader_pid := Leader, replica_pids := []} = Conf1} =
1098+
osiris:start_cluster(Conf0),
1099+
timer:sleep(100),
1100+
write_n(Leader, Num, 0, 1000 * 8, #{}),
1101+
Wc = filename:join([DataDir, ?FUNCTION_NAME, "*.segment"]),
1102+
ok = osiris:update_retention(Leader, [{max_bytes, SegSize * 10}]),
1103+
timer:sleep(10000),
1104+
%% one file only
1105+
?assertNot(length(filelib:wildcard(Wc)) == 1),
1106+
osiris:stop_cluster(Conf1),
1107+
ok.
1108+
10541109
retention_add_replica_after(Config) ->
10551110
DataDir = ?config(data_dir, Config),
10561111
Num = 150000,
@@ -1960,4 +2015,19 @@ simple(Bin) ->
19602015
S = byte_size(Bin),
19612016
<<0:1, S:31, Bin/binary>>.
19622017

2018+
await_condition(_CondFun, _Sleep, 0) ->
2019+
exit(await_condition_attempts_exceeded);
2020+
await_condition(CondFun, Sleep, Attempt) ->
2021+
timer:sleep(Sleep),
2022+
try CondFun() of
2023+
true ->
2024+
ok;
2025+
false ->
2026+
await_condition(CondFun, Sleep, Attempt-1)
2027+
catch
2028+
_:Err ->
2029+
ct:pal("~s err ~p", [?FUNCTION_NAME, Err]),
2030+
await_condition(CondFun, Sleep, Attempt-1)
2031+
end.
2032+
19632033

0 commit comments

Comments
 (0)