Skip to content

Commit dc7380d

Browse files
authored
Merge pull request #108 from rabbitmq/replica-async-init
osiris_replica: optimisations
2 parents 1fbd637 + e800bea commit dc7380d

File tree

6 files changed

+255
-164
lines changed

6 files changed

+255
-164
lines changed

src/osiris.erl

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
configure_logger/1,
2929
get_stats/1]).
3030

31-
%% holds static or rarely changing fields
32-
-record(cfg, {}).
33-
-record(?MODULE, {cfg :: #cfg{}}).
3431

3532
-type config() ::
3633
#{name := string(),
@@ -39,8 +36,6 @@
3936
retention => [osiris:retention_spec()],
4037
atom() => term()}.
4138

42-
-opaque state() :: #?MODULE{}.
43-
4439
-type mfarg() :: {module(), atom(), list()}.
4540
-type offset() :: non_neg_integer().
4641
-type tracking_id() :: osiris_tracking:tracking_id().
@@ -77,8 +72,7 @@
7772
chunk_selector => all | user_data
7873
}.
7974

80-
-export_type([state/0,
81-
config/0,
75+
-export_type([config/0,
8276
offset/0,
8377
epoch/0,
8478
tail_info/0,
@@ -200,7 +194,6 @@ init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options)
200194
when is_pid(Pid) andalso node(Pid) =:= node() ->
201195
?DEBUG("osiris: initialising reader. Spec: ~w", [OffsetSpec]),
202196
{ok, Ctx0} = gen:call(Pid, '$gen_call', get_reader_context),
203-
% CntId = {?MODULE, Ref, Tag, Pid},
204197
Ctx = Ctx0#{counter_spec => CounterSpec,
205198
options => Options},
206199
osiris_log:init_offset_reader(OffsetSpec, Ctx).
@@ -277,9 +270,10 @@ configure_logger(Module) ->
277270
-spec get_stats(pid()) -> #{committed_chunk_id => integer(),
278271
first_chunk_id => integer()}.
279272
get_stats(Pid)
280-
when node(Pid) =:= node() ->
281-
{ok, #{offset_ref := ORef}} = gen:call(Pid, '$gen_call', get_reader_context),
282-
#{committed_chunk_id => atomics:get(ORef, 1),
283-
first_chunk_id => atomics:get(ORef, 2)};
273+
when node(Pid) =:= node() ->
274+
{ok, #{shared := Shared}} = gen:call(Pid, '$gen_call', get_reader_context),
275+
#{committed_chunk_id => osiris_log_shared:committed_chunk_id(Shared),
276+
first_chunk_id => osiris_log_shared:first_chunk_id(Shared),
277+
last_chunk_id => osiris_log_shared:last_chunk_id(Shared)};
284278
get_stats(Pid) when is_pid(Pid) ->
285279
erpc:call(node(Pid), ?MODULE, ?FUNCTION_NAME, [Pid]).

src/osiris_log.erl

Lines changed: 74 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
read_chunk_parsed/1,
3333
read_chunk_parsed/2,
3434
committed_offset/1,
35+
set_committed_chunk_id/2,
3536
get_current_epoch/1,
3637
get_directory/1,
3738
get_name/1,
39+
get_shared/1,
3840
get_default_max_segment_size_bytes/0,
3941
counters_ref/1,
4042
close/1,
@@ -327,7 +329,8 @@
327329
osiris:config() |
328330
#{dir := file:filename(),
329331
epoch => non_neg_integer(),
330-
first_offset_fun => fun((integer()) -> ok),
332+
% first_offset_fun => fun((integer()) -> ok),
333+
shared => atomics:atomics_ref(),
331334
max_segment_size_bytes => non_neg_integer(),
332335
%% max number of writer ids to keep around
333336
tracking_config => osiris_tracking:config(),
@@ -368,11 +371,10 @@
368371
%% the maximum number of active writer deduplication sessions
369372
%% that will be included in snapshots written to new segments
370373
readers_counter_fun = fun(_) -> ok end :: function(),
371-
first_offset_fun :: fun ((integer()) -> ok)}).
374+
shared :: atomics:atomics_ref()
375+
}).
372376
-record(read,
373377
{type :: data | offset,
374-
offset_ref :: undefined | atomics:atomics_ref(),
375-
last_offset = 0 :: offset(),
376378
next_offset = 0 :: offset(),
377379
transport :: transport(),
378380
chunk_selector :: all | user_data}).
@@ -466,7 +468,7 @@ init(#{dir := Dir,
466468
%% is initialised to 0 however and will be updated after each retention run.
467469
counters:put(Cnt, ?C_OFFSET, -1),
468470
counters:put(Cnt, ?C_SEGMENTS, 0),
469-
FirstOffsetFun = maps:get(first_offset_fun, Config, fun (_) -> ok end),
471+
Shared = osiris_log_shared:new(),
470472
Cfg = #cfg{directory = Dir,
471473
name = Name,
472474
max_segment_size_bytes = MaxSizeBytes,
@@ -475,7 +477,7 @@ init(#{dir := Dir,
475477
retention = Retention,
476478
counter = Cnt,
477479
counter_id = counter_id(Config),
478-
first_offset_fun = FirstOffsetFun},
480+
shared = Shared},
479481
ok = maybe_fix_corrupted_files(Config),
480482
case first_and_last_seginfos(Config) of
481483
none ->
@@ -486,7 +488,7 @@ init(#{dir := Dir,
486488
_ ->
487489
0
488490
end,
489-
FirstOffsetFun(NextOffset - 1),
491+
osiris_log_shared:set_first_chunk_id(Shared, NextOffset - 1),
490492
open_new_segment(#?MODULE{cfg = Cfg,
491493
mode =
492494
#write{type = WriterType,
@@ -498,11 +500,10 @@ init(#{dir := Dir,
498500
#seg_info{file = Filename,
499501
index = IdxFilename,
500502
size = Size,
501-
last =
502-
#chunk_info{epoch = LastEpoch,
503-
timestamp = LastTs,
504-
id = LastChId,
505-
num = LastNum}}} ->
503+
last = #chunk_info{epoch = LastEpoch,
504+
timestamp = LastTs,
505+
id = LastChId,
506+
num = LastNum}}} ->
506507
%% assert epoch is same or larger
507508
%% than last known epoch
508509
case LastEpoch > Epoch of
@@ -518,7 +519,8 @@ init(#{dir := Dir,
518519
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
519520
counters:put(Cnt, ?C_OFFSET, LastChId + LastNum - 1),
520521
counters:put(Cnt, ?C_SEGMENTS, NumSegments),
521-
FirstOffsetFun(FstChId),
522+
osiris_log_shared:set_first_chunk_id(Shared, FstChId),
523+
osiris_log_shared:set_last_chunk_id(Shared, LastChId),
522524
?DEBUG("~s:~s/~b: ~s next offset ~b first offset ~b",
523525
[?MODULE,
524526
?FUNCTION_NAME,
@@ -553,7 +555,7 @@ init(#{dir := Dir,
553555
%% here too?
554556
{ok, _} = file:position(SegFd, eof),
555557
{ok, _} = file:position(IdxFd, eof),
556-
FirstOffsetFun(-1),
558+
osiris_log_shared:set_first_chunk_id(Shared, -1),
557559
#?MODULE{cfg = Cfg,
558560
mode =
559561
#write{type = WriterType,
@@ -674,12 +676,8 @@ write(Entries, Now, #?MODULE{mode = #write{}} = State)
674676
when is_integer(Now) ->
675677
write(Entries, ?CHNK_USER, Now, <<>>, State).
676678

677-
-spec write([osiris:data()],
678-
chunk_type(),
679-
osiris:timestamp(),
680-
iodata(),
681-
state()) ->
682-
state().
679+
-spec write([osiris:data()], chunk_type(), osiris:timestamp(),
680+
iodata(), state()) -> state().
683681
write([_ | _] = Entries,
684682
ChType,
685683
Now,
@@ -689,8 +687,8 @@ write([_ | _] = Entries,
689687
#write{current_epoch = Epoch, tail_info = {Next, _}} =
690688
_Write0} =
691689
State0)
692-
when is_integer(Now)
693-
andalso is_integer(ChType) ->
690+
when is_integer(Now) andalso
691+
is_integer(ChType) ->
694692
%% The osiris writer always pass Entries in the reversed order
695693
%% in order to avoid unnecessary lists rev|trav|ersals
696694
{ChunkData, NumRecords} =
@@ -986,6 +984,7 @@ check_chunk_has_expected_epoch(ChunkId, Epoch, IdxFiles) ->
986984

987985
init_data_reader_at(ChunkId, FilePos, File,
988986
#{dir := Dir, name := Name,
987+
shared := Shared,
989988
readers_counter_fun := CountersFun} = Config) ->
990989
{ok, Fd} = file:open(File, [raw, binary, read]),
991990
{ok, FilePos} = file:position(Fd, FilePos),
@@ -998,10 +997,10 @@ init_data_reader_at(ChunkId, FilePos, File,
998997
counter_id = counter_id(Config),
999998
name = Name,
1000999
readers_counter_fun = CountersFun,
1001-
first_offset_fun = fun (_) -> ok end},
1000+
shared = Shared
1001+
},
10021002
mode =
10031003
#read{type = data,
1004-
offset_ref = maps:get(offset_ref, Config, undefined),
10051004
next_offset = ChunkId,
10061005
chunk_selector = all,
10071006
transport = maps:get(transport, Config, tcp)},
@@ -1210,7 +1209,7 @@ init_offset_reader0(OffsetSpec, #{} = Conf)
12101209
open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
12111210
#{dir := Dir,
12121211
name := Name,
1213-
offset_ref := OffsetRef,
1212+
shared := Shared,
12141213
readers_counter_fun := ReaderCounterFun,
12151214
options := Options} =
12161215
Conf) ->
@@ -1223,12 +1222,11 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
12231222
counter_id = counter_id(Conf),
12241223
name = Name,
12251224
readers_counter_fun = ReaderCounterFun,
1226-
first_offset_fun = fun (_) -> ok end
1225+
shared = Shared
12271226
},
12281227
mode = #read{type = offset,
12291228
chunk_selector = maps:get(chunk_selector, Options,
12301229
user_data),
1231-
offset_ref = OffsetRef,
12321230
next_offset = NextChunkId,
12331231
transport = maps:get(transport, Options, tcp)},
12341232
fd = Fd}}.
@@ -1278,11 +1276,15 @@ last_user_chunk_id_in_index(NextPos, IdxFd) ->
12781276
Error
12791277
end.
12801278

1281-
-spec committed_offset(state()) -> undefined | offset().
1282-
committed_offset(#?MODULE{mode = #read{offset_ref = undefined}}) ->
1283-
undefined;
1284-
committed_offset(#?MODULE{mode = #read{offset_ref = Ref}}) ->
1285-
atomics:get(Ref, 1).
1279+
-spec committed_offset(state()) -> integer().
1280+
committed_offset(#?MODULE{cfg = #cfg{shared = Ref}}) ->
1281+
osiris_log_shared:committed_chunk_id(Ref).
1282+
1283+
-spec set_committed_chunk_id(state(), offset()) -> ok.
1284+
set_committed_chunk_id(#?MODULE{mode = #write{},
1285+
cfg = #cfg{shared = Ref}}, ChunkId)
1286+
when is_integer(ChunkId) ->
1287+
osiris_log_shared:set_committed_chunk_id(Ref, ChunkId).
12861288

12871289
-spec get_current_epoch(state()) -> non_neg_integer().
12881290
get_current_epoch(#?MODULE{mode = #write{current_epoch = Epoch}}) ->
@@ -1296,6 +1298,10 @@ get_directory(#?MODULE{cfg = #cfg{directory = Dir}}) ->
12961298
get_name(#?MODULE{cfg = #cfg{name = Name}}) ->
12971299
Name.
12981300

1301+
-spec get_shared(state()) -> atomics:atomics_ref().
1302+
get_shared(#?MODULE{cfg = #cfg{shared = Shared}}) ->
1303+
Shared.
1304+
12991305
-spec get_default_max_segment_size_bytes() -> non_neg_integer().
13001306
get_default_max_segment_size_bytes() ->
13011307
?DEFAULT_MAX_SEGMENT_SIZE_B.
@@ -1305,8 +1311,8 @@ counters_ref(#?MODULE{cfg = #cfg{counter = C}}) ->
13051311
C.
13061312

13071313
-spec read_header(state()) ->
1308-
{ok, header_map(), state()} | {end_of_stream, state()} |
1309-
{error, {invalid_chunk_header, term()}}.
1314+
{ok, header_map(), state()} | {end_of_stream, state()} |
1315+
{error, {invalid_chunk_header, term()}}.
13101316
read_header(#?MODULE{cfg = #cfg{}} = State0) ->
13111317
%% reads the next chunk of entries, parsed
13121318
%% NB: this may return records before the requested index,
@@ -2117,7 +2123,8 @@ write_chunk(Chunk,
21172123
Timestamp,
21182124
Epoch,
21192125
NumRecords,
2120-
#?MODULE{cfg = #cfg{counter = CntRef} = Cfg,
2126+
#?MODULE{cfg = #cfg{counter = CntRef,
2127+
shared = Shared} = Cfg,
21212128
fd = Fd,
21222129
index_fd = IdxFd,
21232130
mode =
@@ -2146,6 +2153,7 @@ write_chunk(Chunk,
21462153
Epoch:64/unsigned,
21472154
Cur:32/unsigned,
21482155
ChType:8/unsigned>>),
2156+
osiris_log_shared:set_last_chunk_id(Shared, Next),
21492157
%% update counters
21502158
counters:put(CntRef, ?C_OFFSET, NextOffset - 1),
21512159
counters:add(CntRef, ?C_CHUNKS, 1),
@@ -2159,8 +2167,8 @@ write_chunk(Chunk,
21592167
end.
21602168

21612169

2162-
maybe_set_first_offset(0, #cfg{first_offset_fun = Fun}) ->
2163-
Fun(0);
2170+
maybe_set_first_offset(0, #cfg{shared = Ref}) ->
2171+
osiris_log_shared:set_first_chunk_id(Ref, 0);
21642172
maybe_set_first_offset(_, _Cfg) ->
21652173
ok.
21662174

@@ -2307,16 +2315,17 @@ find_segment_for_offset(Offset, IdxFiles) ->
23072315
not_found
23082316
end.
23092317

2310-
can_read_next_offset(#read{type = offset,
2311-
next_offset = NextOffset,
2312-
offset_ref = Ref}) ->
2313-
atomics:get(Ref, 1) >= NextOffset;
2314-
can_read_next_offset(#read{type = data}) ->
2315-
true.
2318+
can_read_next_chunk_id(#?MODULE{mode = #read{type = offset,
2319+
next_offset = NextOffset},
2320+
cfg = #cfg{shared = Ref}}) ->
2321+
osiris_log_shared:committed_chunk_id(Ref) >= NextOffset;
2322+
can_read_next_chunk_id(#?MODULE{mode = #read{type = data,
2323+
next_offset = NextOffset},
2324+
cfg = #cfg{shared = Ref}}) ->
2325+
osiris_log_shared:last_chunk_id(Ref) >= NextOffset.
23162326

23172327
incr_next_offset(Num, #read{next_offset = NextOffset} = Read) ->
2318-
Read#read{last_offset = NextOffset,
2319-
next_offset = NextOffset + Num}.
2328+
Read#read{next_offset = NextOffset + Num}.
23202329

23212330
make_file_name(N, Suff) ->
23222331
lists:flatten(
@@ -2535,14 +2544,14 @@ recover_tracking(Fd, Trk0) ->
25352544
end.
25362545

25372546
read_header0(#?MODULE{cfg = #cfg{directory = Dir,
2547+
shared = Shared,
25382548
counter = CntRef},
2539-
mode = #read{offset_ref = ORef,
2540-
next_offset = NextChId0} = Read0,
2549+
mode = #read{next_offset = NextChId0} = Read0,
25412550
current_file = CurFile,
25422551
fd = Fd} =
25432552
State) ->
25442553
%% reads the next header if permitted
2545-
case can_read_next_offset(Read0) of
2554+
case can_read_next_chunk_id(State) of
25462555
true ->
25472556
{ok, Pos} = file:position(Fd, cur),
25482557
case file:read(Fd, ?HEADER_SIZE_B) of
@@ -2583,9 +2592,11 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
25832592
{ok, Pos} = file:position(Fd, Pos),
25842593
{end_of_stream, State};
25852594
eof ->
2586-
FirstOffset = atomics:get(ORef, 2),
2595+
FirstOffset = osiris_log_shared:first_chunk_id(Shared),
25872596
%% open next segment file and start there if it exists
25882597
NextChId = max(FirstOffset, NextChId0),
2598+
%% TODO: replace this check with a last chunk id counter
2599+
%% updated by the writer and replicas
25892600
SegFile = make_file_name(NextChId, "segment"),
25902601
case SegFile == CurFile of
25912602
true ->
@@ -2640,20 +2651,20 @@ trigger_retention_eval(#?MODULE{cfg =
26402651
#cfg{directory = Dir,
26412652
retention = RetentionSpec,
26422653
counter = Cnt,
2643-
first_offset_fun = Fun}} = State) ->
2644-
ok =
2645-
osiris_retention:eval(Dir, RetentionSpec,
2646-
%% updates first offset and first timestamp
2647-
%% after retention has been evaluated
2648-
fun ({{FstOff, _}, FstTs, Seg}) when is_integer(FstOff),
2649-
is_integer(FstTs) ->
2650-
Fun(FstOff),
2651-
counters:put(Cnt, ?C_FIRST_OFFSET, FstOff),
2652-
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
2653-
counters:put(Cnt, ?C_SEGMENTS, Seg);
2654-
(_) ->
2655-
ok
2656-
end),
2654+
shared = Shared}} = State) ->
2655+
2656+
%% updates first offset and first timestamp
2657+
%% after retention has been evaluated
2658+
EvalFun = fun ({{FstOff, _}, FstTs, Seg}) when is_integer(FstOff),
2659+
is_integer(FstTs) ->
2660+
osiris_log_shared:set_first_chunk_id(Shared, FstOff),
2661+
counters:put(Cnt, ?C_FIRST_OFFSET, FstOff),
2662+
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
2663+
counters:put(Cnt, ?C_SEGMENTS, Seg);
2664+
(_) ->
2665+
ok
2666+
end,
2667+
ok = osiris_retention:eval(Dir, RetentionSpec, EvalFun),
26572668
State.
26582669

26592670
next_location(undefined) ->

0 commit comments

Comments
 (0)