Skip to content

Commit ee6ffd9

Browse files
Merge pull request #13987 from rabbitmq/mergify/bp/v4.1.x/pr-13959
CQ shared store: Delete from index on remove or roll over (backport #13959)
2 parents 39951de + 1c956b4 commit ee6ffd9

File tree

2 files changed

+73
-28
lines changed

2 files changed

+73
-28
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,10 @@
7777
current_file,
7878
%% current file handle since the last fsync?
7979
current_file_handle,
80-
%% file handle cache
80+
%% current write file offset
8181
current_file_offset,
82+
%% messages that were potentially removed from the current write file
83+
current_file_removes = [],
8284
%% TRef for our interval timer
8385
sync_timer_ref,
8486
%% files that had removes
@@ -1150,7 +1152,11 @@ write_message(MsgId, Msg, CRef,
11501152
end, CRef, State1)
11511153
end.
11521154

1153-
remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
1155+
remove_message(MsgId, CRef,
1156+
State = #msstate{
1157+
index_ets = IndexEts,
1158+
current_file = CurrentFile,
1159+
current_file_removes = Removes }) ->
11541160
case should_mask_action(CRef, MsgId, State) of
11551161
{true, _Location} ->
11561162
State;
@@ -1162,22 +1168,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
11621168
%% ets:lookup(FileSummaryEts, File),
11631169
State;
11641170
{_Mask, #msg_location { ref_count = RefCount, file = File,
1165-
total_size = TotalSize }}
1171+
total_size = TotalSize } = Entry}
11661172
when RefCount > 0 ->
11671173
%% only update field, otherwise bad interaction with
11681174
%% concurrent GC
1169-
Dec = fun () -> index_update_ref_counter(IndexEts, MsgId, -1) end,
11701175
case RefCount of
1171-
%% don't remove from cur_file_cache_ets here because
1176+
%% Don't remove from cur_file_cache_ets here because
11721177
%% there may be further writes in the mailbox for the
1173-
%% same msg.
1174-
1 -> ok = Dec(),
1175-
delete_file_if_empty(
1176-
File, gc_candidate(File,
1177-
adjust_valid_total_size(
1178-
File, -TotalSize, State)));
1179-
_ -> ok = Dec(),
1180-
gc_candidate(File, State)
1178+
%% same msg. We will remove 0 ref_counts when rolling
1179+
%% over to the next write file.
1180+
1 when File =:= CurrentFile ->
1181+
index_update_ref_counter(IndexEts, MsgId, -1),
1182+
State1 = State#msstate{current_file_removes =
1183+
[Entry#msg_location{ref_count=0}|Removes]},
1184+
delete_file_if_empty(
1185+
File, gc_candidate(File,
1186+
adjust_valid_total_size(
1187+
File, -TotalSize, State1)));
1188+
1 ->
1189+
index_delete(IndexEts, MsgId),
1190+
delete_file_if_empty(
1191+
File, gc_candidate(File,
1192+
adjust_valid_total_size(
1193+
File, -TotalSize, State)));
1194+
_ ->
1195+
index_update_ref_counter(IndexEts, MsgId, -1),
1196+
gc_candidate(File, State)
11811197
end
11821198
end.
11831199

@@ -1239,7 +1255,9 @@ flush_or_roll_to_new_file(
12391255
cur_file_cache_ets = CurFileCacheEts,
12401256
file_size_limit = FileSizeLimit })
12411257
when Offset >= FileSizeLimit ->
1242-
State1 = internal_sync(State),
1258+
%% Cleanup the index of messages that were removed before rolling over.
1259+
State0 = cleanup_index_on_roll_over(State),
1260+
State1 = internal_sync(State0),
12431261
ok = writer_close(CurHdl),
12441262
NextFile = CurFile + 1,
12451263
{ok, NextHdl} = writer_open(Dir, NextFile),
@@ -1267,6 +1285,8 @@ write_large_message(MsgId, MsgBodyBin,
12671285
index_ets = IndexEts,
12681286
file_summary_ets = FileSummaryEts,
12691287
cur_file_cache_ets = CurFileCacheEts }) ->
1288+
%% Cleanup the index of messages that were removed before rolling over.
1289+
State1 = cleanup_index_on_roll_over(State0),
12701290
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
12711291
%% We haven't written in the file yet. Use it.
12721292
0 ->
@@ -1286,13 +1306,13 @@ write_large_message(MsgId, MsgBodyBin,
12861306
ok = index_insert(IndexEts,
12871307
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
12881308
offset = 0, total_size = TotalSize }),
1289-
State1 = case CurFile of
1309+
State2 = case CurFile of
12901310
%% We didn't open a new file. We must update the existing value.
12911311
LargeMsgFile ->
12921312
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
12931313
[{#file_summary.valid_total_size, TotalSize},
12941314
{#file_summary.file_size, TotalSize}]),
1295-
State0;
1315+
State1;
12961316
%% We opened a new file. We can insert it all at once.
12971317
%% We must also check whether we need to delete the previous
12981318
%% current file, because if there is no valid data this is
@@ -1303,7 +1323,7 @@ write_large_message(MsgId, MsgBodyBin,
13031323
valid_total_size = TotalSize,
13041324
file_size = TotalSize,
13051325
locked = false }),
1306-
delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl,
1326+
delete_file_if_empty(CurFile, State1 #msstate { current_file_handle = LargeMsgHdl,
13071327
current_file = LargeMsgFile,
13081328
current_file_offset = TotalSize })
13091329
end,
@@ -1318,11 +1338,22 @@ write_large_message(MsgId, MsgBodyBin,
13181338
%% Delete messages from the cache that were written to disk.
13191339
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
13201340
%% Process confirms (this won't flush; we already did) and continue.
1321-
State = internal_sync(State1),
1341+
State = internal_sync(State2),
13221342
State #msstate { current_file_handle = NextHdl,
13231343
current_file = NextFile,
13241344
current_file_offset = 0 }.
13251345

1346+
cleanup_index_on_roll_over(State = #msstate{
1347+
index_ets = IndexEts,
1348+
current_file_removes = Removes}) ->
1349+
lists:foreach(fun(Entry) ->
1350+
%% We delete objects that have ref_count=0. If a message
1351+
%% got its ref_count increased, it will not be deleted.
1352+
%% We thus avoid extra index lookups to check for ref_count.
1353+
index_delete_object(IndexEts, Entry)
1354+
end, Removes),
1355+
State#msstate{current_file_removes=[]}.
1356+
13261357
contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) ->
13271358
MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId),
13281359
gen_server2:reply(From, MsgLocation =/= not_found),
@@ -1643,7 +1674,7 @@ index_update(IndexEts, Obj) ->
16431674
ok.
16441675

16451676
index_update_fields(IndexEts, Key, Updates) ->
1646-
true = ets:update_element(IndexEts, Key, Updates),
1677+
_ = ets:update_element(IndexEts, Key, Updates),
16471678
ok.
16481679

16491680
index_update_ref_counter(IndexEts, Key, RefCount) ->
@@ -1967,10 +1998,21 @@ delete_file_if_empty(File, State = #msstate {
19671998
%% We do not try to look at messages that are not the last because we do not want to
19681999
%% accidentally write over messages that were moved earlier.
19692000

1970-
compact_file(File, State = #gc_state { index_ets = IndexEts,
1971-
file_summary_ets = FileSummaryEts,
1972-
dir = Dir,
1973-
msg_store = Server }) ->
2001+
compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts }) ->
2002+
case ets:lookup(FileSummaryEts, File) of
2003+
[] ->
2004+
rabbit_log:debug("File ~tp has already been deleted; no need to compact",
2005+
[File]),
2006+
ok;
2007+
[#file_summary{file_size = FileSize}] ->
2008+
compact_file(File, FileSize, State)
2009+
end.
2010+
2011+
compact_file(File, FileSize,
2012+
State = #gc_state { index_ets = IndexEts,
2013+
file_summary_ets = FileSummaryEts,
2014+
dir = Dir,
2015+
msg_store = Server }) ->
19742016
%% Get metadata about the file. Will be used to calculate
19752017
%% how much data was reclaimed as a result of compaction.
19762018
[#file_summary{file_size = FileSize}] = ets:lookup(FileSummaryEts, File),
@@ -2123,9 +2165,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
21232165

21242166
-spec delete_file(non_neg_integer(), gc_state()) -> ok | defer.
21252167

2126-
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
2127-
file_handles_ets = FileHandlesEts,
2128-
dir = Dir }) ->
2168+
delete_file(File, #gc_state { file_summary_ets = FileSummaryEts,
2169+
file_handles_ets = FileHandlesEts,
2170+
dir = Dir }) ->
21292171
case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
21302172
{[_|_], _Cont} ->
21312173
rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
@@ -2134,7 +2176,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
21342176
_ ->
21352177
[#file_summary{ valid_total_size = 0,
21362178
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
2137-
[] = scan_and_vacuum_message_file(File, State),
21382179
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
21392180
true = ets:delete(FileSummaryEts, File),
21402181
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),

deps/rabbit/src/rabbit_msg_store_gc.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).
1313

1414
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
15-
terminate/2, code_change/3]).
15+
terminate/2, code_change/3, prioritise_cast/3]).
1616

1717
-record(state,
1818
{ pending,
@@ -51,6 +51,10 @@ delete(Server, File) ->
5151
stop(Server) ->
5252
gen_server2:call(Server, stop, infinity).
5353

54+
%% TODO replace with priority messages for OTP28+
55+
prioritise_cast({delete, _}, _Len, _State) -> 5;
56+
prioritise_cast(_, _Len, _State) -> 0.
57+
5458
%%----------------------------------------------------------------------------
5559

5660
init([MsgStoreState]) ->

0 commit comments

Comments
 (0)