diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 482e9cfa4f45..5965589bfd11 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -77,8 +77,10 @@ current_file, %% current file handle since the last fsync? current_file_handle, - %% file handle cache + %% current write file offset current_file_offset, + %% messages that were potentially removed from the current write file + current_file_removes = [], %% TRef for our interval timer sync_timer_ref, %% files that had removes @@ -1150,7 +1152,11 @@ write_message(MsgId, Msg, CRef, end, CRef, State1) end. -remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) -> +remove_message(MsgId, CRef, + State = #msstate{ + index_ets = IndexEts, + current_file = CurrentFile, + current_file_removes = Removes }) -> case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; @@ -1162,22 +1168,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) -> %% ets:lookup(FileSummaryEts, File), State; {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} + total_size = TotalSize } = Entry} when RefCount > 0 -> %% only update field, otherwise bad interaction with %% concurrent GC - Dec = fun () -> index_update_ref_counter(IndexEts, MsgId, -1) end, case RefCount of - %% don't remove from cur_file_cache_ets here because + %% Don't remove from cur_file_cache_ets here because %% there may be further writes in the mailbox for the - %% same msg. - 1 -> ok = Dec(), - delete_file_if_empty( - File, gc_candidate(File, - adjust_valid_total_size( - File, -TotalSize, State))); - _ -> ok = Dec(), - gc_candidate(File, State) + %% same msg. We will remove 0 ref_counts when rolling + %% over to the next write file. + 1 when File =:= CurrentFile -> + index_update_ref_counter(IndexEts, MsgId, -1), + State1 = State#msstate{current_file_removes = + [Entry#msg_location{ref_count=0}|Removes]}, + delete_file_if_empty( + File, gc_candidate(File, + adjust_valid_total_size( + File, -TotalSize, State1))); + 1 -> + index_delete(IndexEts, MsgId), + delete_file_if_empty( + File, gc_candidate(File, + adjust_valid_total_size( + File, -TotalSize, State))); + _ -> + index_update_ref_counter(IndexEts, MsgId, -1), + gc_candidate(File, State) end end. @@ -1239,7 +1255,9 @@ flush_or_roll_to_new_file( cur_file_cache_ets = CurFileCacheEts, file_size_limit = FileSizeLimit }) when Offset >= FileSizeLimit -> - State1 = internal_sync(State), + %% Cleanup the index of messages that were removed before rolling over. + State0 = cleanup_index_on_roll_over(State), + State1 = internal_sync(State0), ok = writer_close(CurHdl), NextFile = CurFile + 1, {ok, NextHdl} = writer_open(Dir, NextFile), @@ -1267,6 +1285,8 @@ write_large_message(MsgId, MsgBodyBin, index_ets = IndexEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> + %% Cleanup the index of messages that were removed before rolling over. + State1 = cleanup_index_on_roll_over(State0), {LargeMsgFile, LargeMsgHdl} = case CurOffset of %% We haven't written in the file yet. Use it. 0 -> @@ -1286,13 +1306,13 @@ write_large_message(MsgId, MsgBodyBin, ok = index_insert(IndexEts, #msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile, offset = 0, total_size = TotalSize }), - State1 = case CurFile of + State2 = case CurFile of %% We didn't open a new file. We must update the existing value. LargeMsgFile -> [_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - State0; + State1; %% We opened a new file. We can insert it all at once. %% We must also check whether we need to delete the previous %% current file, because if there is no valid data this is @@ -1303,7 +1323,7 @@ write_large_message(MsgId, MsgBodyBin, valid_total_size = TotalSize, file_size = TotalSize, locked = false }), - delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl, + delete_file_if_empty(CurFile, State1 #msstate { current_file_handle = LargeMsgHdl, current_file = LargeMsgFile, current_file_offset = TotalSize }) end, @@ -1318,11 +1338,22 @@ write_large_message(MsgId, MsgBodyBin, %% Delete messages from the cache that were written to disk. true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), %% Process confirms (this won't flush; we already did) and continue. - State = internal_sync(State1), + State = internal_sync(State2), State #msstate { current_file_handle = NextHdl, current_file = NextFile, current_file_offset = 0 }. +cleanup_index_on_roll_over(State = #msstate{ + index_ets = IndexEts, + current_file_removes = Removes}) -> + lists:foreach(fun(Entry) -> + %% We delete objects that have ref_count=0. If a message + %% got its ref_count increased, it will not be deleted. + %% We thus avoid extra index lookups to check for ref_count. + index_delete_object(IndexEts, Entry) + end, Removes), + State#msstate{current_file_removes=[]}. + contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) -> MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId), gen_server2:reply(From, MsgLocation =/= not_found), @@ -1643,7 +1674,7 @@ index_update(IndexEts, Obj) -> ok. index_update_fields(IndexEts, Key, Updates) -> - true = ets:update_element(IndexEts, Key, Updates), + _ = ets:update_element(IndexEts, Key, Updates), ok. index_update_ref_counter(IndexEts, Key, RefCount) -> @@ -1967,10 +1998,21 @@ delete_file_if_empty(File, State = #msstate { %% We do not try to look at messages that are not the last because we do not want to %% accidentally write over messages that were moved earlier. -compact_file(File, State = #gc_state { index_ets = IndexEts, - file_summary_ets = FileSummaryEts, - dir = Dir, - msg_store = Server }) -> +compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts }) -> + case ets:lookup(FileSummaryEts, File) of + [] -> + rabbit_log:debug("File ~tp has already been deleted; no need to compact", + [File]), + ok; + [#file_summary{file_size = FileSize}] -> + compact_file(File, FileSize, State) + end. + +compact_file(File, FileSize, + State = #gc_state { index_ets = IndexEts, + file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> %% Get metadata about the file. Will be used to calculate %% how much data was reclaimed as a result of compaction. [#file_summary{file_size = FileSize}] = ets:lookup(FileSummaryEts, File), @@ -2123,9 +2165,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File -spec delete_file(non_neg_integer(), gc_state()) -> ok | defer. -delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - dir = Dir }) -> +delete_file(File, #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + dir = Dir }) -> case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of {[_|_], _Cont} -> rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.", @@ -2134,7 +2176,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, _ -> [#file_summary{ valid_total_size = 0, file_size = FileSize }] = ets:lookup(FileSummaryEts, File), - [] = scan_and_vacuum_message_file(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), true = ets:delete(FileSummaryEts, File), rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]), diff --git a/deps/rabbit/src/rabbit_msg_store_gc.erl b/deps/rabbit/src/rabbit_msg_store_gc.erl index f18100c0b254..868dc3087b89 100644 --- a/deps/rabbit/src/rabbit_msg_store_gc.erl +++ b/deps/rabbit/src/rabbit_msg_store_gc.erl @@ -12,7 +12,7 @@ -export([start_link/1, compact/2, truncate/4, delete/2, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/3]). -record(state, { pending, @@ -51,6 +51,10 @@ delete(Server, File) -> stop(Server) -> gen_server2:call(Server, stop, infinity). +%% TODO replace with priority messages for OTP28+ +prioritise_cast({delete, _}, _Len, _State) -> 5; +prioritise_cast(_, _Len, _State) -> 0. + %%---------------------------------------------------------------------------- init([MsgStoreState]) ->