Skip to content

Commit c744a0f

Browse files
authored
Merge pull request #110 from rabbitmq/scheduled-retention-eval
Scheduled retention re-evaluation for max_age configured streams
2 parents dc7380d + ecc8a70 commit c744a0f

File tree

10 files changed

+265
-79
lines changed

10 files changed

+265
-79
lines changed

src/osiris.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
get_stats/1]).
3030

3131

32+
-type name() :: string() | binary().
3233
-type config() ::
33-
#{name := string(),
34+
#{name := name(),
3435
reference => term(),
3536
event_formatter => {module(), atom(), list()},
3637
retention => [osiris:retention_spec()],
@@ -72,7 +73,8 @@
7273
chunk_selector => all | user_data
7374
}.
7475

75-
-export_type([config/0,
76+
-export_type([name/0,
77+
config/0,
7678
offset/0,
7779
epoch/0,
7880
tail_info/0,
@@ -84,8 +86,9 @@
8486
data/0]).
8587

8688
-spec start_cluster(config()) ->
87-
{ok, config()} | {error, term()} |
88-
{error, term(), config()}.
89+
{ok, config()} |
90+
{error, term()} |
91+
{error, term(), config()}.
8992
start_cluster(Config00 = #{name := Name}) ->
9093
?DEBUG("osiris: starting new cluster ~s", [Name]),
9194
true = osiris_util:validate_base64uri(Name),
@@ -124,7 +127,7 @@ start_replica(Replica, Config) ->
124127
WriterId :: binary() | undefined,
125128
CorrOrSeq :: non_neg_integer() | term(),
126129
Data :: data()) ->
127-
ok.
130+
ok.
128131
write(Pid, WriterId, Corr, Data) ->
129132
osiris_writer:write(Pid, self(), WriterId, Corr, Data).
130133

@@ -221,7 +224,7 @@ register_offset_listener(Pid, Offset, EvtFormatter) ->
221224
ok.
222225

223226
-spec update_retention(pid(), [osiris:retention_spec()]) ->
224-
ok | {error, term()}.
227+
ok | {error, term()}.
225228
update_retention(Pid, Retention)
226229
when is_pid(Pid) andalso is_list(Retention) ->
227230
Msg = {update_retention, Retention},

src/osiris.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
domain => [osiris]}),
3131
ok).
3232

33+
-define(IS_STRING(S), is_list(S) orelse is_binary(S)).
34+
3335
-define(C_NUM_LOG_FIELDS, 5).
3436

3537
-define(MAGIC, 5).

src/osiris_log.erl

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@
327327
?CHNK_TRK_SNAPSHOT.
328328
-type config() ::
329329
osiris:config() |
330-
#{dir := file:filename(),
330+
#{dir := file:filename_all(),
331331
epoch => non_neg_integer(),
332332
% first_offset_fun => fun((integer()) -> ok),
333333
shared => atomics:atomics_ref(),
@@ -340,7 +340,7 @@
340340
%% a cached list of the index files for a given log
341341
%% avoids scanning disk for files multiple times if already know
342342
%% e.g. in init_acceptor
343-
index_files => [filename:filename()]}.
343+
index_files => [file:filename_all()]}.
344344
-type record() :: {offset(), osiris:data()}.
345345
-type offset_spec() :: osiris:offset_spec().
346346
-type retention_spec() :: osiris:retention_spec().
@@ -360,8 +360,8 @@
360360

361361
%% holds static or rarely changing fields
362362
-record(cfg,
363-
{directory :: file:filename(),
364-
name :: string(),
363+
{directory :: file:filename_all(),
364+
name :: osiris:name(),
365365
max_segment_size_bytes = ?DEFAULT_MAX_SEGMENT_SIZE_B :: non_neg_integer(),
366366
max_segment_size_chunks = ?DEFAULT_MAX_SEGMENT_SIZE_C :: non_neg_integer(),
367367
tracking_config = #{} :: osiris_tracking:config(),
@@ -392,7 +392,7 @@
392392
-record(?MODULE,
393393
{cfg :: #cfg{},
394394
mode :: #read{} | #write{},
395-
current_file :: undefined | file:filename(),
395+
current_file :: undefined | file:filename_all(),
396396
index_fd :: undefined | file:io_device(),
397397
fd :: undefined | file:io_device()
398398
}).
@@ -409,9 +409,9 @@
409409
pos :: integer()
410410
}).
411411
-record(seg_info,
412-
{file :: file:filename(),
412+
{file :: file:filename_all(),
413413
size = 0 :: non_neg_integer(),
414-
index :: file:filename(),
414+
index :: file:filename_all(),
415415
first :: undefined | #chunk_info{},
416416
last :: undefined | #chunk_info{}}).
417417

@@ -422,13 +422,13 @@
422422
config/0,
423423
counter_spec/0]).
424424

425-
-spec directory(osiris:config() | list()) -> file:filename().
425+
-spec directory(osiris:config() | list()) -> file:filename_all().
426426
directory(#{name := Name, dir := Dir}) ->
427427
filename:join(Dir, Name);
428428
directory(#{name := Name}) ->
429429
{ok, Dir} = application:get_env(osiris, data_dir),
430430
filename:join(Dir, Name);
431-
directory(Name) when is_list(Name) ->
431+
directory(Name) when ?IS_STRING(Name) ->
432432
{ok, Dir} = application:get_env(osiris, data_dir),
433433
filename:join(Dir, Name).
434434

@@ -808,7 +808,8 @@ init_acceptor(Range, EpochOffsets0,
808808
init(Conf#{initial_offset => InitOffset,
809809
index_files => RemIdxFiles}, acceptor).
810810

811-
chunk_id_index_scan(IdxFile, ChunkId) when is_list(IdxFile) ->
811+
chunk_id_index_scan(IdxFile, ChunkId)
812+
when ?IS_STRING(IdxFile) ->
812813
Fd = open_index_read(IdxFile),
813814
chunk_id_index_scan0(Fd, ChunkId).
814815

@@ -1232,7 +1233,8 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
12321233
fd = Fd}}.
12331234

12341235
%% Searches the index files backwards for the ID of the last user chunk.
1235-
last_user_chunk_location(RevdIdxFiles) when is_list(RevdIdxFiles) ->
1236+
last_user_chunk_location(RevdIdxFiles)
1237+
when is_list(RevdIdxFiles) ->
12361238
{Time, Result} = timer:tc(
12371239
fun() ->
12381240
last_user_chunk_id0(RevdIdxFiles)
@@ -1290,7 +1292,7 @@ set_committed_chunk_id(#?MODULE{mode = #write{},
12901292
get_current_epoch(#?MODULE{mode = #write{current_epoch = Epoch}}) ->
12911293
Epoch.
12921294

1293-
-spec get_directory(state()) -> file:filename().
1295+
-spec get_directory(state()) -> file:filename_all().
12941296
get_directory(#?MODULE{cfg = #cfg{directory = Dir}}) ->
12951297
Dir.
12961298

@@ -1554,7 +1556,7 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId,
15541556

15551557
delete_directory(#{name := Name} = Config) when is_map(Config) ->
15561558
delete_directory(Name);
1557-
delete_directory(Name) when is_list(Name) ->
1559+
delete_directory(Name) when ?IS_STRING(Name) ->
15581560
Dir = directory(Name),
15591561
?DEBUG("osiris_log: deleting directory ~s", [Dir]),
15601562
case file:list_dir(Dir) of
@@ -1630,7 +1632,7 @@ sorted_index_files(#{index_files := IdxFiles}) ->
16301632
IdxFiles;
16311633
sorted_index_files(#{dir := Dir}) ->
16321634
sorted_index_files(Dir);
1633-
sorted_index_files(Dir) when is_list(Dir) orelse is_binary(Dir) ->
1635+
sorted_index_files(Dir) when ?IS_STRING(Dir) ->
16341636
Files = index_files_unsorted(Dir),
16351637
lists:sort(Files).
16361638

@@ -1649,7 +1651,9 @@ index_files_unsorted(Dir) ->
16491651
[];
16501652
{ok, Files} ->
16511653
[filename:join(Dir, F)
1652-
|| F <- Files, filename:extension(F) == ".index"]
1654+
|| F <- Files,
1655+
filename:extension(F) == ".index" orelse
1656+
filename:extension(F) == <<".index">>]
16531657
end.
16541658

16551659
first_and_last_seginfos(#{index_files := IdxFiles}) ->
@@ -1707,7 +1711,7 @@ build_seg_info(IdxFile) ->
17071711
last_idx_record(IdxFd) ->
17081712
nth_last_idx_record(IdxFd, 1).
17091713

1710-
nth_last_idx_record(IdxFile, N) when is_list(IdxFile) ->
1714+
nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) ->
17111715
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
17121716
IdxRecord = nth_last_idx_record(IdxFd, N),
17131717
_ = file:close(IdxFd),
@@ -1888,7 +1892,8 @@ update_retention(Retention,
18881892
trigger_retention_eval(State).
18891893

18901894
-spec evaluate_retention(file:filename_all(), [retention_spec()]) ->
1891-
{range(), non_neg_integer()}.
1895+
{range(), FirstTimestamp :: osiris:timestamp(),
1896+
NumRemainingFiles :: non_neg_integer()}.
18921897
evaluate_retention(Dir, Specs) when is_list(Dir) ->
18931898
% convert to binary for faster operations later
18941899
% mostly in segment_from_index_file/1
@@ -2426,8 +2431,8 @@ throw_missing({error, enoent}) ->
24262431
throw_missing(Any) ->
24272432
Any.
24282433

2429-
open(SegFile, Options) ->
2430-
throw_missing(file:open(SegFile, Options)).
2434+
open(File, Options) ->
2435+
throw_missing(file:open(File, Options)).
24312436

24322437
chunk_location_for_timestamp(Idx, Ts) ->
24332438
Fd = open_index_read(Idx),
@@ -2648,23 +2653,25 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
26482653
end.
26492654

26502655
trigger_retention_eval(#?MODULE{cfg =
2651-
#cfg{directory = Dir,
2656+
#cfg{name = Name,
2657+
directory = Dir,
26522658
retention = RetentionSpec,
26532659
counter = Cnt,
26542660
shared = Shared}} = State) ->
26552661

26562662
%% updates first offset and first timestamp
26572663
%% after retention has been evaluated
2658-
EvalFun = fun ({{FstOff, _}, FstTs, Seg}) when is_integer(FstOff),
2659-
is_integer(FstTs) ->
2664+
EvalFun = fun ({{FstOff, _}, FstTs, NumSegLeft})
2665+
when is_integer(FstOff),
2666+
is_integer(FstTs) ->
26602667
osiris_log_shared:set_first_chunk_id(Shared, FstOff),
26612668
counters:put(Cnt, ?C_FIRST_OFFSET, FstOff),
26622669
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
2663-
counters:put(Cnt, ?C_SEGMENTS, Seg);
2670+
counters:put(Cnt, ?C_SEGMENTS, NumSegLeft);
26642671
(_) ->
26652672
ok
26662673
end,
2667-
ok = osiris_retention:eval(Dir, RetentionSpec, EvalFun),
2674+
ok = osiris_retention:eval(Name, Dir, RetentionSpec, EvalFun),
26682675
State.
26692676

26702677
next_location(undefined) ->
@@ -2675,8 +2682,10 @@ next_location(#chunk_info{id = Id,
26752682
size = Size}) ->
26762683
{Id + Num, Pos + Size + ?HEADER_SIZE_B}.
26772684

2678-
index_file_first_offset(IdxFile) ->
2679-
list_to_integer(filename:basename(IdxFile, ".index")).
2685+
index_file_first_offset(IdxFile) when is_list(IdxFile) ->
2686+
list_to_integer(filename:basename(IdxFile, ".index"));
2687+
index_file_first_offset(IdxFile) when is_binary(IdxFile) ->
2688+
binary_to_integer(filename:basename(IdxFile, <<".index">>)).
26802689

26812690
first_last_timestamps(IdxFile) ->
26822691
case file:open(IdxFile, [raw, read, binary]) of

src/osiris_replica.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@
4949

5050
%% holds static or rarely changing fields
5151
-record(cfg,
52-
{name :: string(),
52+
{name :: osiris:name(),
5353
leader_pid :: pid(),
5454
acceptor_pid :: pid(),
5555
replica_reader_pid :: pid(),
56-
directory :: file:filename(),
56+
directory :: file:filename_all(),
5757
port :: non_neg_integer(),
5858
transport :: osiris_log:transport(),
5959
socket :: undefined | gen_tcp:socket() | ssl:sslsocket(),
@@ -99,7 +99,7 @@
9999
%%% API functions
100100
%%%===================================================================
101101

102-
start(Node, Config = #{name := Name}) when is_list(Name) ->
102+
start(Node, Config = #{name := Name}) when ?IS_STRING(Name) ->
103103
case supervisor:start_child({?SUP, Node},
104104
#{id => Name,
105105
start => {?MODULE, start_link, [Config]},
@@ -158,9 +158,11 @@ await(Server) ->
158158
init(Config) ->
159159
{ok, undefined, {continue, Config}}.
160160

161-
handle_continue(#{name := Name,
161+
handle_continue(#{name := Name0,
162162
leader_pid := LeaderPid,
163-
reference := ExtRef} = Config, undefined) ->
163+
reference := ExtRef} = Config, undefined)
164+
when ?IS_STRING(Name0) ->
165+
Name = osiris_util:normalise_name(Name0),
164166
process_flag(trap_exit, true),
165167
process_flag(message_queue_data, off_heap),
166168
Node = node(LeaderPid),

src/osiris_replica_reader.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
-record(state,
3333
{log :: osiris_log:state(),
34-
name :: string(),
34+
name :: osiris:name(),
3535
transport :: osiris_log:transport(),
3636
socket :: gen_tcp:socket() | ssl:sslsocket(),
3737
replica_pid :: pid(),
@@ -183,7 +183,6 @@ init(#{hosts := Hosts,
183183
leader_monitor_ref = MRef,
184184
counter = CntRef,
185185
counter_id = CntId}),
186-
?DEBUG("sent committed offset information to the leader at ~p", [LeaderPid]),
187186
{ok, State}
188187
catch
189188
exit:{noproc, _} ->

0 commit comments

Comments
 (0)