Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@
current_file,
%% current file handle since the last fsync?
current_file_handle,
%% file handle cache
%% current write file offset
current_file_offset,
%% messages that were potentially removed from the current write file
current_file_removes = [],
%% TRef for our interval timer
sync_timer_ref,
%% files that had removes
Expand Down Expand Up @@ -1150,7 +1152,11 @@ write_message(MsgId, Msg, CRef,
end, CRef, State1)
end.

remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
remove_message(MsgId, CRef,
State = #msstate{
index_ets = IndexEts,
current_file = CurrentFile,
current_file_removes = Removes }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
Expand All @@ -1162,22 +1168,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
%% ets:lookup(FileSummaryEts, File),
State;
{_Mask, #msg_location { ref_count = RefCount, file = File,
total_size = TotalSize }}
total_size = TotalSize } = Entry}
when RefCount > 0 ->
%% only update field, otherwise bad interaction with
%% concurrent GC
Dec = fun () -> index_update_ref_counter(IndexEts, MsgId, -1) end,
case RefCount of
%% don't remove from cur_file_cache_ets here because
%% Don't remove from cur_file_cache_ets here because
%% there may be further writes in the mailbox for the
%% same msg.
1 -> ok = Dec(),
delete_file_if_empty(
File, gc_candidate(File,
adjust_valid_total_size(
File, -TotalSize, State)));
_ -> ok = Dec(),
gc_candidate(File, State)
%% same msg. We will remove 0 ref_counts when rolling
%% over to the next write file.
1 when File =:= CurrentFile ->
index_update_ref_counter(IndexEts, MsgId, -1),
State1 = State#msstate{current_file_removes =
[Entry#msg_location{ref_count=0}|Removes]},
delete_file_if_empty(
File, gc_candidate(File,
adjust_valid_total_size(
File, -TotalSize, State1)));
1 ->
index_delete(IndexEts, MsgId),
delete_file_if_empty(
File, gc_candidate(File,
adjust_valid_total_size(
File, -TotalSize, State)));
_ ->
index_update_ref_counter(IndexEts, MsgId, -1),
gc_candidate(File, State)
end
end.

Expand Down Expand Up @@ -1239,7 +1255,9 @@ flush_or_roll_to_new_file(
cur_file_cache_ets = CurFileCacheEts,
file_size_limit = FileSizeLimit })
when Offset >= FileSizeLimit ->
State1 = internal_sync(State),
%% Cleanup the index of messages that were removed before rolling over.
State0 = cleanup_index_on_roll_over(State),
State1 = internal_sync(State0),
ok = writer_close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
Expand Down Expand Up @@ -1267,6 +1285,8 @@ write_large_message(MsgId, MsgBodyBin,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% Cleanup the index of messages that were removed before rolling over.
State1 = cleanup_index_on_roll_over(State0),
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
%% We haven't written in the file yet. Use it.
0 ->
Expand All @@ -1286,13 +1306,13 @@ write_large_message(MsgId, MsgBodyBin,
ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
offset = 0, total_size = TotalSize }),
State1 = case CurFile of
State2 = case CurFile of
%% We didn't open a new file. We must update the existing value.
LargeMsgFile ->
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
State0;
State1;
%% We opened a new file. We can insert it all at once.
%% We must also check whether we need to delete the previous
%% current file, because if there is no valid data this is
Expand All @@ -1303,7 +1323,7 @@ write_large_message(MsgId, MsgBodyBin,
valid_total_size = TotalSize,
file_size = TotalSize,
locked = false }),
delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl,
delete_file_if_empty(CurFile, State1 #msstate { current_file_handle = LargeMsgHdl,
current_file = LargeMsgFile,
current_file_offset = TotalSize })
end,
Expand All @@ -1318,11 +1338,22 @@ write_large_message(MsgId, MsgBodyBin,
%% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
%% Process confirms (this won't flush; we already did) and continue.
State = internal_sync(State1),
State = internal_sync(State2),
State #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 }.

cleanup_index_on_roll_over(State = #msstate{
index_ets = IndexEts,
current_file_removes = Removes}) ->
lists:foreach(fun(Entry) ->
%% We delete objects that have ref_count=0. If a message
%% got its ref_count increased, it will not be deleted.
%% We thus avoid extra index lookups to check for ref_count.
index_delete_object(IndexEts, Entry)
end, Removes),
State#msstate{current_file_removes=[]}.

contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) ->
MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId),
gen_server2:reply(From, MsgLocation =/= not_found),
Expand Down Expand Up @@ -1643,7 +1674,7 @@ index_update(IndexEts, Obj) ->
ok.

index_update_fields(IndexEts, Key, Updates) ->
true = ets:update_element(IndexEts, Key, Updates),
_ = ets:update_element(IndexEts, Key, Updates),
ok.

index_update_ref_counter(IndexEts, Key, RefCount) ->
Expand Down Expand Up @@ -1967,10 +1998,21 @@ delete_file_if_empty(File, State = #msstate {
%% We do not try to look at messages that are not the last because we do not want to
%% accidentally write over messages that were moved earlier.

compact_file(File, State = #gc_state { index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
dir = Dir,
msg_store = Server }) ->
compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] ->
rabbit_log:debug("File ~tp has already been deleted; no need to compact",
[File]),
ok;
[#file_summary{file_size = FileSize}] ->
compact_file(File, FileSize, State)
end.

compact_file(File, FileSize,
State = #gc_state { index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
dir = Dir,
msg_store = Server }) ->
%% Get metadata about the file. Will be used to calculate
%% how much data was reclaimed as a result of compaction.
[#file_summary{file_size = FileSize}] = ets:lookup(FileSummaryEts, File),
Expand Down Expand Up @@ -2123,9 +2165,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File

-spec delete_file(non_neg_integer(), gc_state()) -> ok | defer.

delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
dir = Dir }) ->
delete_file(File, #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
dir = Dir }) ->
case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
{[_|_], _Cont} ->
rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
Expand All @@ -2134,7 +2176,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
_ ->
[#file_summary{ valid_total_size = 0,
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
[] = scan_and_vacuum_message_file(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
true = ets:delete(FileSummaryEts, File),
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/src/rabbit_msg_store_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
terminate/2, code_change/3, prioritise_cast/3]).

-record(state,
{ pending,
Expand Down Expand Up @@ -51,6 +51,10 @@ delete(Server, File) ->
stop(Server) ->
gen_server2:call(Server, stop, infinity).

%% TODO replace with priority messages for OTP28+
prioritise_cast({delete, _}, _Len, _State) -> 5;
prioritise_cast(_, _Len, _State) -> 0.

%%----------------------------------------------------------------------------

init([MsgStoreState]) ->
Expand Down
Loading