Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
get_stats/1]).


-type name() :: string() | binary().
-type config() ::
#{name := string(),
#{name := name(),
reference => term(),
event_formatter => {module(), atom(), list()},
retention => [osiris:retention_spec()],
Expand Down Expand Up @@ -72,7 +73,8 @@
chunk_selector => all | user_data
}.

-export_type([config/0,
-export_type([name/0,
config/0,
offset/0,
epoch/0,
tail_info/0,
Expand All @@ -84,8 +86,9 @@
data/0]).

-spec start_cluster(config()) ->
{ok, config()} | {error, term()} |
{error, term(), config()}.
{ok, config()} |
{error, term()} |
{error, term(), config()}.
start_cluster(Config00 = #{name := Name}) ->
?DEBUG("osiris: starting new cluster ~s", [Name]),
true = osiris_util:validate_base64uri(Name),
Expand Down Expand Up @@ -124,7 +127,7 @@ start_replica(Replica, Config) ->
WriterId :: binary() | undefined,
CorrOrSeq :: non_neg_integer() | term(),
Data :: data()) ->
ok.
ok.
write(Pid, WriterId, Corr, Data) ->
osiris_writer:write(Pid, self(), WriterId, Corr, Data).

Expand Down Expand Up @@ -221,7 +224,7 @@ register_offset_listener(Pid, Offset, EvtFormatter) ->
ok.

-spec update_retention(pid(), [osiris:retention_spec()]) ->
ok | {error, term()}.
ok | {error, term()}.
update_retention(Pid, Retention)
when is_pid(Pid) andalso is_list(Retention) ->
Msg = {update_retention, Retention},
Expand Down
2 changes: 2 additions & 0 deletions src/osiris.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
domain => [osiris]}),
ok).

-define(IS_STRING(S), is_list(S) orelse is_binary(S)).

-define(C_NUM_LOG_FIELDS, 5).

-define(MAGIC, 5).
Expand Down
61 changes: 35 additions & 26 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@
?CHNK_TRK_SNAPSHOT.
-type config() ::
osiris:config() |
#{dir := file:filename(),
#{dir := file:filename_all(),
epoch => non_neg_integer(),
% first_offset_fun => fun((integer()) -> ok),
shared => atomics:atomics_ref(),
Expand All @@ -340,7 +340,7 @@
%% a cached list of the index files for a given log
%% avoids scanning disk for files multiple times if already know
%% e.g. in init_acceptor
index_files => [filename:filename()]}.
index_files => [file:filename_all()]}.
-type record() :: {offset(), osiris:data()}.
-type offset_spec() :: osiris:offset_spec().
-type retention_spec() :: osiris:retention_spec().
Expand All @@ -360,8 +360,8 @@

%% holds static or rarely changing fields
-record(cfg,
{directory :: file:filename(),
name :: string(),
{directory :: file:filename_all(),
name :: osiris:name(),
max_segment_size_bytes = ?DEFAULT_MAX_SEGMENT_SIZE_B :: non_neg_integer(),
max_segment_size_chunks = ?DEFAULT_MAX_SEGMENT_SIZE_C :: non_neg_integer(),
tracking_config = #{} :: osiris_tracking:config(),
Expand Down Expand Up @@ -392,7 +392,7 @@
-record(?MODULE,
{cfg :: #cfg{},
mode :: #read{} | #write{},
current_file :: undefined | file:filename(),
current_file :: undefined | file:filename_all(),
index_fd :: undefined | file:io_device(),
fd :: undefined | file:io_device()
}).
Expand All @@ -409,9 +409,9 @@
pos :: integer()
}).
-record(seg_info,
{file :: file:filename(),
{file :: file:filename_all(),
size = 0 :: non_neg_integer(),
index :: file:filename(),
index :: file:filename_all(),
first :: undefined | #chunk_info{},
last :: undefined | #chunk_info{}}).

Expand All @@ -422,13 +422,13 @@
config/0,
counter_spec/0]).

-spec directory(osiris:config() | list()) -> file:filename().
-spec directory(osiris:config() | list()) -> file:filename_all().
directory(#{name := Name, dir := Dir}) ->
filename:join(Dir, Name);
directory(#{name := Name}) ->
{ok, Dir} = application:get_env(osiris, data_dir),
filename:join(Dir, Name);
directory(Name) when is_list(Name) ->
directory(Name) when ?IS_STRING(Name) ->
{ok, Dir} = application:get_env(osiris, data_dir),
filename:join(Dir, Name).

Expand Down Expand Up @@ -808,7 +808,8 @@ init_acceptor(Range, EpochOffsets0,
init(Conf#{initial_offset => InitOffset,
index_files => RemIdxFiles}, acceptor).

chunk_id_index_scan(IdxFile, ChunkId) when is_list(IdxFile) ->
chunk_id_index_scan(IdxFile, ChunkId)
when ?IS_STRING(IdxFile) ->
Fd = open_index_read(IdxFile),
chunk_id_index_scan0(Fd, ChunkId).

Expand Down Expand Up @@ -1232,7 +1233,8 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
fd = Fd}}.

%% Searches the index files backwards for the ID of the last user chunk.
last_user_chunk_location(RevdIdxFiles) when is_list(RevdIdxFiles) ->
last_user_chunk_location(RevdIdxFiles)
when is_list(RevdIdxFiles) ->
{Time, Result} = timer:tc(
fun() ->
last_user_chunk_id0(RevdIdxFiles)
Expand Down Expand Up @@ -1290,7 +1292,7 @@ set_committed_chunk_id(#?MODULE{mode = #write{},
get_current_epoch(#?MODULE{mode = #write{current_epoch = Epoch}}) ->
Epoch.

-spec get_directory(state()) -> file:filename().
-spec get_directory(state()) -> file:filename_all().
get_directory(#?MODULE{cfg = #cfg{directory = Dir}}) ->
Dir.

Expand Down Expand Up @@ -1554,7 +1556,7 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId,

delete_directory(#{name := Name} = Config) when is_map(Config) ->
delete_directory(Name);
delete_directory(Name) when is_list(Name) ->
delete_directory(Name) when ?IS_STRING(Name) ->
Dir = directory(Name),
?DEBUG("osiris_log: deleting directory ~s", [Dir]),
case file:list_dir(Dir) of
Expand Down Expand Up @@ -1630,7 +1632,7 @@ sorted_index_files(#{index_files := IdxFiles}) ->
IdxFiles;
sorted_index_files(#{dir := Dir}) ->
sorted_index_files(Dir);
sorted_index_files(Dir) when is_list(Dir) orelse is_binary(Dir) ->
sorted_index_files(Dir) when ?IS_STRING(Dir) ->
Files = index_files_unsorted(Dir),
lists:sort(Files).

Expand All @@ -1649,7 +1651,9 @@ index_files_unsorted(Dir) ->
[];
{ok, Files} ->
[filename:join(Dir, F)
|| F <- Files, filename:extension(F) == ".index"]
|| F <- Files,
filename:extension(F) == ".index" orelse
filename:extension(F) == <<".index">>]
end.

first_and_last_seginfos(#{index_files := IdxFiles}) ->
Expand Down Expand Up @@ -1707,7 +1711,7 @@ build_seg_info(IdxFile) ->
last_idx_record(IdxFd) ->
nth_last_idx_record(IdxFd, 1).

nth_last_idx_record(IdxFile, N) when is_list(IdxFile) ->
nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) ->
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
IdxRecord = nth_last_idx_record(IdxFd, N),
_ = file:close(IdxFd),
Expand Down Expand Up @@ -1888,7 +1892,8 @@ update_retention(Retention,
trigger_retention_eval(State).

-spec evaluate_retention(file:filename_all(), [retention_spec()]) ->
{range(), non_neg_integer()}.
{range(), FirstTimestamp :: osiris:timestamp(),
NumRemainingFiles :: non_neg_integer()}.
evaluate_retention(Dir, Specs) when is_list(Dir) ->
% convert to binary for faster operations later
% mostly in segment_from_index_file/1
Expand Down Expand Up @@ -2426,8 +2431,8 @@ throw_missing({error, enoent}) ->
throw_missing(Any) ->
Any.

open(SegFile, Options) ->
throw_missing(file:open(SegFile, Options)).
open(File, Options) ->
throw_missing(file:open(File, Options)).

chunk_location_for_timestamp(Idx, Ts) ->
Fd = open_index_read(Idx),
Expand Down Expand Up @@ -2648,23 +2653,25 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
end.

trigger_retention_eval(#?MODULE{cfg =
#cfg{directory = Dir,
#cfg{name = Name,
directory = Dir,
retention = RetentionSpec,
counter = Cnt,
shared = Shared}} = State) ->

%% updates first offset and first timestamp
%% after retention has been evaluated
EvalFun = fun ({{FstOff, _}, FstTs, Seg}) when is_integer(FstOff),
is_integer(FstTs) ->
EvalFun = fun ({{FstOff, _}, FstTs, NumSegLeft})
when is_integer(FstOff),
is_integer(FstTs) ->
osiris_log_shared:set_first_chunk_id(Shared, FstOff),
counters:put(Cnt, ?C_FIRST_OFFSET, FstOff),
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
counters:put(Cnt, ?C_SEGMENTS, Seg);
counters:put(Cnt, ?C_SEGMENTS, NumSegLeft);
(_) ->
ok
end,
ok = osiris_retention:eval(Dir, RetentionSpec, EvalFun),
ok = osiris_retention:eval(Name, Dir, RetentionSpec, EvalFun),
State.

next_location(undefined) ->
Expand All @@ -2675,8 +2682,10 @@ next_location(#chunk_info{id = Id,
size = Size}) ->
{Id + Num, Pos + Size + ?HEADER_SIZE_B}.

index_file_first_offset(IdxFile) ->
list_to_integer(filename:basename(IdxFile, ".index")).
index_file_first_offset(IdxFile) when is_list(IdxFile) ->
list_to_integer(filename:basename(IdxFile, ".index"));
index_file_first_offset(IdxFile) when is_binary(IdxFile) ->
binary_to_integer(filename:basename(IdxFile, <<".index">>)).

first_last_timestamps(IdxFile) ->
case file:open(IdxFile, [raw, read, binary]) of
Expand Down
12 changes: 7 additions & 5 deletions src/osiris_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@

%% holds static or rarely changing fields
-record(cfg,
{name :: string(),
{name :: osiris:name(),
leader_pid :: pid(),
acceptor_pid :: pid(),
replica_reader_pid :: pid(),
directory :: file:filename(),
directory :: file:filename_all(),
port :: non_neg_integer(),
transport :: osiris_log:transport(),
socket :: undefined | gen_tcp:socket() | ssl:sslsocket(),
Expand Down Expand Up @@ -99,7 +99,7 @@
%%% API functions
%%%===================================================================

start(Node, Config = #{name := Name}) when is_list(Name) ->
start(Node, Config = #{name := Name}) when ?IS_STRING(Name) ->
case supervisor:start_child({?SUP, Node},
#{id => Name,
start => {?MODULE, start_link, [Config]},
Expand Down Expand Up @@ -158,9 +158,11 @@ await(Server) ->
init(Config) ->
{ok, undefined, {continue, Config}}.

handle_continue(#{name := Name,
handle_continue(#{name := Name0,
leader_pid := LeaderPid,
reference := ExtRef} = Config, undefined) ->
reference := ExtRef} = Config, undefined)
when ?IS_STRING(Name0) ->
Name = osiris_util:normalise_name(Name0),
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
Node = node(LeaderPid),
Expand Down
3 changes: 1 addition & 2 deletions src/osiris_replica_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

-record(state,
{log :: osiris_log:state(),
name :: string(),
name :: osiris:name(),
transport :: osiris_log:transport(),
socket :: gen_tcp:socket() | ssl:sslsocket(),
replica_pid :: pid(),
Expand Down Expand Up @@ -183,7 +183,6 @@ init(#{hosts := Hosts,
leader_monitor_ref = MRef,
counter = CntRef,
counter_id = CntId}),
?DEBUG("sent committed offset information to the leader at ~p", [LeaderPid]),
{ok, State}
catch
exit:{noproc, _} ->
Expand Down
Loading