From f98637c9ee4131469348f8ad0a93915abd1bfba2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 26 Feb 2025 11:30:22 +0100 Subject: [PATCH] Fix CQ shared store files not deleted with large messages We must consider whether the previous current file is empty (has data written, but was already removed) when writing large messages and opening a file specifically for the large message. If we don't, then the file will never get deleted as we only consider files for deletion when a message gets removed (and there are none). This is only an issue for large messages. Small messages write a message than roll over to a new file, so there is at least one valid message. Large messages close the current file first, regardless of there being a valid message. (cherry picked from commit 6cf69e2a19b2c87cb0f1ccd07c07d2d4bf1bd546) --- deps/rabbit/src/rabbit_msg_store.erl | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 95cb9b401562..fdd09b1d2940 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -1274,19 +1274,26 @@ write_large_message(MsgId, MsgBodyBin, ok = index_insert(IndexEts, #msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile, offset = 0, total_size = TotalSize }), - _ = case CurFile of + State1 = case CurFile of %% We didn't open a new file. We must update the existing value. LargeMsgFile -> [_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile, [{#file_summary.valid_total_size, TotalSize}, - {#file_summary.file_size, TotalSize}]); + {#file_summary.file_size, TotalSize}]), + State0; %% We opened a new file. We can insert it all at once. + %% We must also check whether we need to delete the previous + %% current file, because if there is no valid data this is + %% the only time we will consider it (outside recovery). _ -> true = ets:insert_new(FileSummaryEts, #file_summary { file = LargeMsgFile, valid_total_size = TotalSize, file_size = TotalSize, - locked = false }) + locked = false }), + delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl, + current_file = LargeMsgFile, + current_file_offset = TotalSize }) end, %% Roll over to the next file. NextFile = LargeMsgFile + 1, @@ -1299,7 +1306,7 @@ write_large_message(MsgId, MsgBodyBin, %% Delete messages from the cache that were written to disk. true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), %% Process confirms (this won't flush; we already did) and continue. - State = internal_sync(State0), + State = internal_sync(State1), State #msstate { current_file_handle = NextHdl, current_file = NextFile, current_file_offset = 0 }.