From c81fbc5dc028936ef2e833db6369a5975d1560a3 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 24 Oct 2025 13:02:46 -0400 Subject: [PATCH 01/10] Allow configuring osiris data_dir in Cuttlefish config This is the same as the `raft.data_dir` option but for Osiris' data directory. Configuring this in Cuttlefish is nicer than the existing `$RABBITMQ_STREAM_DIR` environment variable way of changing the dir. --- deps/rabbit/priv/schema/rabbit.schema | 13 +++++++++++++ .../test/config_schema_SUITE_data/rabbit.snippets | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 1b4d43593ce..e6017f23f2a 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2776,6 +2776,19 @@ fun(Conf) -> end end}. +{mapping, "stream.data_dir", "osiris.data_dir", [ + {datatype, string} +]}. + +{translation, "osiris.data_dir", + fun(Conf) -> + case cuttlefish:conf_get("stream.data_dir", Conf, undefined) of + undefined -> cuttlefish:unset(); + Val -> Val + end + end +}. + {mapping, "stream.read_ahead", "rabbit.stream_read_ahead", [{datatype, {enum, [true, false]}}]}. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index dc214a17076..a68868e4631 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1253,6 +1253,16 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [ {stream_read_ahead, false} ]}], + []}, + + %% + %% Stream data dir + %% + {stream_data_dir, + "stream.data_dir = /data/rabbitmq/stream", + [{osiris, [ + {data_dir, "/data/rabbitmq/stream"} + ]}], []} ]. From 2116c0c6ea1c2f12579635c1db37e36e469e86c2 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 24 Oct 2025 14:00:40 -0400 Subject: [PATCH 02/10] rabbit_stream_queue: Enable recovery after registering queue type --- deps/rabbit/src/rabbit_stream_queue.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index affe36296df..02899f1827b 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -126,7 +126,8 @@ [queue, <<"stream">>, ?MODULE]}}, {cleanup, {rabbit_registry, unregister, [queue, <<"stream">>]}}, - {requires, rabbit_registry} + {requires, rabbit_registry}, + {enables, recovery} ]}). -type client() :: #stream_client{}. From 6295123769628569be7869d29ca9b9f9c46ab823 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 21 Oct 2025 17:38:49 -0400 Subject: [PATCH 03/10] rabbit_alarm: Prefer maps to dicts This is not a functional change, just a refactor to eliminate dicts and use maps instead. This cleans up some helper functions like dict_append/3, and we can use map comprehensions in some places to avoid intermediary lists. --- deps/rabbit/src/rabbit_alarm.erl | 84 +++++++++++++++----------------- 1 file changed, 38 insertions(+), 46 deletions(-) diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 879fdae8189..84c5188f159 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -42,9 +42,9 @@ %%---------------------------------------------------------------------------- --record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()), - alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]), - alarms :: [alarm()]}). +-record(alarms, {alertees = #{} :: #{pid() => rabbit_types:mfargs()}, + alarmed_nodes = #{} :: #{node() => [resource_alarm_source()]}, + alarms = [] :: [alarm()]}). -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. @@ -90,7 +90,7 @@ stop() -> ok. %% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source' %% has the type of resource_alarm_source() and `Alert' has the type of resource_alert(). --spec register(pid(), rabbit_types:mfargs()) -> [atom()]. +-spec register(pid(), rabbit_types:mfargs()) -> [resource_alarm_source()]. register(Pid, AlertMFA) -> gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). @@ -177,12 +177,10 @@ remote_conserve_resources(Pid, Source, {false, _, _}) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{alertees = dict:new(), - alarmed_nodes = dict:new(), - alarms = []}}. + {ok, #alarms{}}. handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> - {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), + {ok, lists:usort(lists:append(maps:values(AN))), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State) -> @@ -236,14 +234,11 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> - AlarmsForDeadNode = case dict:find(Node, AN) of - {ok, V} -> V; - error -> [] - end, + AlarmsForDeadNode = maps:get(Node, AN, []), {ok, lists:foldr(fun(Source, AccState) -> ?LOG_WARNING("~ts resource limit alarm cleared for dead node ~tp", [Source, Node]), - maybe_alert(fun dict_unappend/3, Node, Source, false, AccState) + maybe_alert(fun map_unappend/3, Node, Source, false, AccState) end, State, AlarmsForDeadNode)}; handle_event({register, Pid, AlertMFA}, State) -> @@ -254,7 +249,7 @@ handle_event(_Event, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #alarms{alertees = Alertees}) -> - {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}}; + {ok, State#alarms{alertees = maps:remove(Pid, Alertees)}}; handle_info(_Info, State) -> {ok, State}. @@ -267,22 +262,14 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -dict_append(Key, Val, Dict) -> - L = case dict:find(Key, Dict) of - {ok, V} -> V; - error -> [] - end, - dict:store(Key, lists:usort([Val|L]), Dict). - -dict_unappend(Key, Val, Dict) -> - L = case dict:find(Key, Dict) of - {ok, V} -> V; - error -> [] - end, +map_append(Key, Val, Map) -> + maps:update_with(Key, fun(Vs) -> [Val | Vs] end, [Val], Map). +map_unappend(Key, Val, Map) -> + L = maps:get(Key, Map, []), case lists:delete(Val, L) of - [] -> dict:erase(Key, Dict); - X -> dict:store(Key, X, Dict) + [] -> maps:remove(Key, Map); + X -> Map#{Key := X} end. maybe_alert(UpdateFun, Node, Source, WasAlertAdded, @@ -290,7 +277,10 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), %% Is alarm for Source still set on any node? - StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), + StillHasAlerts = rabbit_misc:maps_any( + fun(_Node, NodeAlerts) -> + lists:member(Source, NodeAlerts) + end, AN1), case StillHasAlerts of true -> ok; false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", [Source]) @@ -311,22 +301,24 @@ alert_remote(Alert, Alertees, Source) -> alert(Alertees, Source, Alert, NodeComparator) -> Node = node(), - dict:fold(fun (Pid, {M, F, A}, ok) -> - case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Source, Alert]); - false -> ok - end - end, ok, Alertees). + maps:foreach(fun (Pid, {M, F, A}) -> + case NodeComparator(Node, node(Pid)) of + true -> apply(M, F, A ++ [Pid, Source, Alert]); + false -> ok + end + end, Alertees). internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - _ = case dict:find(node(), State#alarms.alarmed_nodes) of - {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; - error -> ok + Node = node(), + _ = case State#alarms.alarmed_nodes of + #{Node := Sources} -> + [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; + _ -> + ok end, - NewAlertees = dict:store(Pid, AlertMFA, Alertees), - State#alarms{alertees = NewAlertees}. + State#alarms{alertees = Alertees#{Pid => AlertMFA}}. handle_set_resource_alarm(Source, Node, State) -> ?LOG_WARNING( @@ -335,7 +327,7 @@ handle_set_resource_alarm(Source, Node, State) -> "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}. + {ok, maybe_alert(fun map_append/3, Node, Source, true, State)}. handle_set_alarm({file_descriptor_limit, []}, State) -> ?LOG_WARNING( @@ -351,7 +343,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_resource_alarm(Source, Node, State) -> ?LOG_WARNING("~ts resource limit alarm cleared on node ~tp", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. + {ok, maybe_alert(fun map_unappend/3, Node, Source, false, State)}. handle_clear_alarm(file_descriptor_limit, State) -> ?LOG_WARNING("file descriptor limit alarm cleared~n"), @@ -361,14 +353,14 @@ handle_clear_alarm(Alarm, State) -> {ok, State}. is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) -> - case dict:find(Node, AN) of - {ok, Sources} -> + case AN of + #{Node := Sources} -> lists:member(Source, Sources); - error -> + _ -> false end. compute_alarms(#alarms{alarms = Alarms, alarmed_nodes = AN}) -> Alarms ++ [ {{resource_limit, Source, Node}, []} - || {Node, Sources} <- dict:to_list(AN), Source <- Sources ]. + || Node := Sources <- AN, Source <- Sources ]. From 7f9d9d1e8c16655fadbe567e68c1901b10d78d99 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 16 Sep 2025 10:36:35 -0400 Subject: [PATCH 04/10] rabbit_env: Enable disksup in os_mon but set threshold to 1.0 Previously we set `start_disksup` to `false` to avoid OTP's automatic monitoring of disk space. `disksup`'s gen_server starts a port (which runs `df` on Unix) which measures disk usage and sets an alarm through OTP's `alarm_handler` when usage exceeds the configured `disk_almost_full_threshold`. We can set this threshold to 1.0 to effectively turn off disksup's monitoring (i.e. the alarm will never be set). By enabling disksup we have access to `get_disk_data/0` and `get_disk_info/0,1` which can be used to replace the copied versions in `rabbit_disk_monitor`. --- deps/rabbit_common/src/rabbit_env.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/rabbit_common/src/rabbit_env.erl b/deps/rabbit_common/src/rabbit_env.erl index e3c551ae9c0..bc70a8e3128 100644 --- a/deps/rabbit_common/src/rabbit_env.erl +++ b/deps/rabbit_common/src/rabbit_env.erl @@ -306,7 +306,12 @@ context_to_app_env_vars1( [{kernel, inet_default_connect_options, [{nodelay, true}]}, {sasl, errlog_type, error}, {os_mon, start_cpu_sup, false}, - {os_mon, start_disksup, false}, + %% Start disksup but configure the threshold high enough that it will + %% never alarm. `disksup' must be started to call `get_disk_info/0,1' + %% and `get_disk_data/0' but we don't want it polluting the logs with + %% its alarms. Alarming is done by `rabbit_disk_monitor' instead. + {os_mon, start_disksup, true}, + {os_mon, disk_almost_full_threshold, 1.0}, {os_mon, start_memsup, false}, {mnesia, dir, DataDir}, {ra, data_dir, QuorumQueueDir}, From 3da73a715a0b6ce638e7617e2b77428290dbc80d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 16 Sep 2025 12:07:59 -0400 Subject: [PATCH 05/10] rabbit_disk_monitor: Use disksup to determine available bytes `disksup` now exposes the calculation for available disk space for a given path using the same `df` mechanism on Unix. We can use this directly and drop the custom code which reimplements that. --- deps/rabbit/src/rabbit_disk_monitor.erl | 195 ++----------------- deps/rabbit/test/unit_disk_monitor_SUITE.erl | 4 +- 2 files changed, 18 insertions(+), 181 deletions(-) diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 292bce853d7..12f8ae29d29 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -15,11 +15,6 @@ %% watermark (configurable either as an absolute value or %% relative to the memory limit). %% -%% Disk monitoring is done by shelling out to /usr/bin/df -%% instead of related built-in OTP functions because currently -%% this is the most reliable way of determining free disk space -%% for the partition our internal database is on. -%% %% Update interval is dynamically calculated assuming disk %% space is being filled at FAST_RATE. @@ -65,11 +60,7 @@ %% on start-up retries, %% Interval between retries - interval, - %% Operating system in use - os, - %% Port running sh to execute df commands - port + interval }). %%---------------------------------------------------------------------------- @@ -134,18 +125,9 @@ init([Limit]) -> State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0), State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1), - OS = os:type(), - Port = case OS of - {unix, _} -> - start_portprogram(); - {win32, _OSname} -> - not_used - end, - State3 = State2#state{port=Port, os=OS}, - - State4 = enable(State3), + State3 = enable(State2), - {ok, State4}. + {ok, State3}. handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> ?LOG_INFO("Cannot set disk free limit: " @@ -210,43 +192,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- -start_portprogram() -> - Args = ["-s", "rabbit_disk_monitor"], - Opts = [stream, stderr_to_stdout, {args, Args}], - erlang:open_port({spawn_executable, "/bin/sh"}, Opts). - -run_port_cmd(Cmd0, Port) -> - %% Insert a carriage return, ^M or ASCII 13, after the command, - %% to indicate end of output - Cmd1 = io_lib:format("~ts < /dev/null; echo \"\^M\"~n", [Cmd0]), - Cmd2 = rabbit_data_coercion:to_utf8_binary(Cmd1), - Port ! {self(), {command, [Cmd2, 10]}}, % The 10 at the end is a newline - get_reply(Port, []). - -get_reply(Port, O) -> - receive - {Port, {data, N}} -> - case newline(N, O) of - {ok, Str} -> - Str; - {more, Acc} -> - get_reply(Port, Acc) - end; - {'EXIT', Port, Reason} -> - exit({port_died, Reason}) - end. - -% Character 13 is ^M or carriage return -newline([13|_], B) -> - {ok, lists:reverse(B)}; -newline([H|T], B) -> - newline(T, [H|B]); -newline([], B) -> - {more, B}. - -find_cmd(Cmd) -> - os:find_executable(Cmd). - safe_ets_lookup(Key, Default) -> try case ets:lookup(?ETS_NAME, Key) of @@ -281,10 +226,8 @@ set_disk_limits(State, Limit0) -> internal_update(State = #state{limit = Limit, dir = Dir, - alarmed = Alarmed, - os = OS, - port = Port}) -> - CurrentFree = get_disk_free(Dir, OS, Port), + alarmed = Alarmed}) -> + CurrentFree = get_disk_free(Dir), %% note: 'NaN' is considered to be less than a number NewAlarmed = CurrentFree < Limit, case {Alarmed, NewAlarmed} of @@ -300,103 +243,16 @@ internal_update(State = #state{limit = Limit, ets:insert(?ETS_NAME, {disk_free, CurrentFree}), State#state{alarmed = NewAlarmed, actual = CurrentFree}. -get_disk_free(Dir, {unix, Sun}, Port) - when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> - Df = find_cmd("df"), - parse_free_unix(run_port_cmd(Df ++ " -k '" ++ Dir ++ "'", Port)); -get_disk_free(Dir, {unix, _}, Port) -> - Df = find_cmd("df"), - parse_free_unix(run_port_cmd(Df ++ " -kP '" ++ Dir ++ "'", Port)); -get_disk_free(Dir, {win32, _}, not_used) -> - % Dir: - % "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia" - case win32_get_drive_letter(Dir) of - error -> - ?LOG_WARNING("Expected the mnesia directory absolute " - "path to start with a drive letter like " - "'C:'. The path is: '~tp'", [Dir]), - {ok, Free} = win32_get_disk_free_dir(Dir), - Free; - DriveLetter -> - % Note: yes, "$\s" is the $char sequence for an ASCII space - F = fun([D, $:, $\\, $\s | _]) when D =:= DriveLetter -> - true; - (_) -> false - end, - % Note: we can use os_mon_sysinfo:get_disk_info/1 after the following is fixed: - % https://github.com/erlang/otp/issues/6156 - try - % Note: DriveInfoStr is in this format - % "C:\\ DRIVE_FIXED 720441434112 1013310287872 720441434112\n" - Lines = os_mon_sysinfo:get_disk_info(), - [DriveInfoStr] = lists:filter(F, Lines), - [DriveLetter, $:, $\\, $\s | DriveInfo] = DriveInfoStr, - - % https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-getdiskfreespaceexa - % lib/os_mon/c_src/win32sysinfo.c: - % if (fpGetDiskFreeSpaceEx(drive,&availbytes,&totbytes,&totbytesfree)){ - % sprintf(answer,"%s DRIVE_FIXED %I64u %I64u %I64u\n",drive,availbytes,totbytes,totbytesfree); - ["DRIVE_FIXED", FreeBytesAvailableToCallerStr, - _TotalNumberOfBytesStr, _TotalNumberOfFreeBytesStr] = string:tokens(DriveInfo, " "), - list_to_integer(FreeBytesAvailableToCallerStr) - catch _:{timeout, _}:_ -> - %% could not compute the result - 'NaN'; - _:Reason:_ -> - ?LOG_WARNING("Free disk space monitoring failed to retrieve the amount of available space: ~p", [Reason]), - %% could not compute the result - 'NaN' - end - end. - -parse_free_unix(Str) -> - case string:tokens(Str, "\n") of - [_, S | _] -> case string:tokens(S, " \t") of - [_, _, _, Free | _] -> list_to_integer(Free) * 1024; - _ -> exit({unparseable, Str}) - end; - _ -> exit({unparseable, Str}) - end. - -win32_get_drive_letter([DriveLetter, $:, $/ | _]) when (DriveLetter >= $a andalso DriveLetter =< $z) -> - % Note: os_mon_sysinfo returns drives with uppercase letters, so uppercase it here - DriveLetter - 32; -win32_get_drive_letter([DriveLetter, $:, $/ | _]) when (DriveLetter >= $A andalso DriveLetter =< $Z) -> - DriveLetter; -win32_get_drive_letter(_) -> - error. - -win32_get_disk_free_dir(Dir) -> - %% On Windows, the Win32 API enforces a limit of 260 characters - %% (MAX_PATH). If we call `dir` with a path longer than that, it - %% fails with "File not found". Starting with Windows 10 version - %% 1607, this limit was removed, but the administrator has to - %% configure that. - %% - %% NTFS supports paths up to 32767 characters. Therefore, paths - %% longer than 260 characters exist but they are "inaccessible" to - %% `dir`. - %% - %% A workaround is to tell the Win32 API to not parse a path and - %% just pass it raw to the underlying filesystem. To do this, the - %% path must be prepended with "\\?\". That's what we do here. - %% - %% However, the underlying filesystem may not support forward - %% slashes transparently, as the Win32 API does. Therefore, we - %% convert all forward slashes to backslashes. - %% - %% See the following page to learn more about this: - %% https://ss64.com/nt/syntax-filenames.html - RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all), - case run_os_cmd("dir /-C /W \"" ++ RawDir ++ "\"") of - {error, Error} -> - exit({unparseable, Error}); - CommandResult -> - LastLine0 = lists:last(string:tokens(CommandResult, "\r\n")), - LastLine1 = lists:reverse(LastLine0), - {match, [Free]} = re:run(LastLine1, "(\\d+)", - [{capture, all_but_first, list}]), - {ok, list_to_integer(lists:reverse(Free))} +-spec get_disk_free(file:filename_all()) -> + AvailableBytes :: non_neg_integer() | 'NaN'. +get_disk_free(Dir) -> + case disksup:get_disk_info(Dir) of + [{D, 0, 0, 0, 0}] when D =:= Dir orelse D =:= "none" -> + 'NaN'; + [{_MountPoint, _TotalKiB, AvailableKiB, _Capacity}] -> + AvailableKiB * 1024; + _DiskInfo -> + 'NaN' end. interpret_limit({mem_relative, Relative}) @@ -437,8 +293,8 @@ interval(#state{limit = Limit, enable(#state{retries = 0} = State) -> ?LOG_ERROR("Free disk space monitor failed to start!"), State; -enable(#state{dir = Dir, os = OS, port = Port} = State) -> - enable_handle_disk_free(catch get_disk_free(Dir, OS, Port), State). +enable(#state{dir = Dir} = State) -> + enable_handle_disk_free(get_disk_free(Dir), State). enable_handle_disk_free(DiskFree, State) when is_integer(DiskFree) -> enable_handle_total_memory(catch vm_memory_monitor:get_total_memory(), DiskFree, State); @@ -461,20 +317,3 @@ enable_handle_total_memory(Error, _DiskFree, #state{interval = Interval, retries [Retries, Error]), erlang:send_after(Interval, self(), try_enable), State#state{enabled = false}. - -run_os_cmd(Cmd) -> - Pid = self(), - Ref = make_ref(), - CmdFun = fun() -> - CmdResult = rabbit_misc:os_cmd(Cmd), - Pid ! {Pid, Ref, CmdResult} - end, - CmdPid = spawn(CmdFun), - receive - {Pid, Ref, CmdResult} -> - CmdResult - after 5000 -> - exit(CmdPid, kill), - ?LOG_ERROR("Command timed out: '~ts'", [Cmd]), - {error, timeout} - end. diff --git a/deps/rabbit/test/unit_disk_monitor_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_SUITE.erl index 3058cc904eb..2a795f01376 100644 --- a/deps/rabbit/test/unit_disk_monitor_SUITE.erl +++ b/deps/rabbit/test/unit_disk_monitor_SUITE.erl @@ -9,9 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). --compile(export_all). - --define(TIMEOUT, 30000). +-compile([nowarn_export_all, export_all]). all() -> [ From d57a1c264697fff068361f913a5a017c6e093b98 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 16 Oct 2025 10:06:09 -0400 Subject: [PATCH 06/10] rabbit.schema: Add config options for per-queue-type disk limits --- deps/rabbit/priv/schema/rabbit.schema | 63 +++++++++++++++++++ .../config_schema_SUITE_data/rabbit.snippets | 8 +++ 2 files changed, 71 insertions(+) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index e6017f23f2a..80d7216ddd0 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1238,6 +1238,69 @@ fun(Conf) -> end end}. +%% Per-queue-type / per-mount-point disk alarms +{mapping, "disk_free_limits.$id.mount_point", "rabbit.disk_free_limits", [ + {datatype, [string]} +]}. +{mapping, "disk_free_limits.$id.absolute", "rabbit.disk_free_limits", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. +{mapping, "disk_free_limits.$id.queue_types", "rabbit.disk_free_limits", [ + {datatype, [binary]} +]}. + +{translation, "rabbit.disk_free_limits", +fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("disk_free_limits", Conf) of + [] -> + cuttlefish:unset(); + Settings -> + lists:foldl( + fun ({["disk_free_limits", Id, "mount_point"], Path}, Acc) -> + maps:update_with( + list_to_binary(Id), + fun ({undefined, Limit, QTs}) -> + {Path, Limit, QTs}; + ({ExistingPath, Limit, QTs}) -> + cuttlefish:warn( + "Duplicate mount point set for ~ts, using " + "'~ts' instead of '~ts'", + [Id, Path, ExistingPath]), + {Path, Limit, QTs} + end, {Path, undefined, []}, Acc); + ({["disk_free_limits", Id, "absolute"], Limit}, Acc) -> + maps:update_with( + list_to_binary(Id), + fun ({Path, undefined, QTypes}) -> + {Path, Limit, QTypes}; + ({Path, ExistingLimit, QTypes}) -> + cuttlefish:warn( + "Duplicate disk free limits set for ~ts, " + "using ~tp instead of ~tp", + [Id, Limit, ExistingLimit]), + {Path, Limit, QTypes} + end, {undefined, Limit, []}, Acc); + ({["disk_free_limits", Id, "queue_types"], QTs0}, Acc) -> + QTs = string:split(QTs0, ","), + maps:update_with( + list_to_binary(Id), + fun ({Path, Limit, []}) -> + {Path, Limit, QTs}; + ({Path, Limit, ExistingQTs}) -> + cuttlefish:warn("Duplicate queue types set " + "for disk free limit of ~ts, " + "using ~tp instead of ~tp", + [Id, QTs, ExistingQTs]), + {Path, Limit, QTs} + end, {undefined, undefined, QTs}, Acc); + (Other, _Acc) -> + cuttlefish:invalid( + io_lib:format("~p is invalid", [Other])) + end, #{}, Settings) + end +end}. + %% %% Clustering %% ===================== diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index a68868e4631..6c0effc96b6 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -467,6 +467,14 @@ tcp_listen_options.exit_on_close = false", "total_memory_available_override_value = 1024MB", [{rabbit,[{total_memory_available_override_value, "1024MB"}]}], []}, + {disk_free_limits_per_mount_point, + "disk_free_limits.replicated.mount_point = /data/replicated + disk_free_limits.replicated.absolute = 2GB + disk_free_limits.replicated.queue_types = stream,quorum", + [{rabbit,[{disk_free_limits, + #{<<"replicated">> => {"/data/replicated", "2GB", + [<<"stream">>, <<"quorum">>]}}}]}], + []}, {ranch_connection_max, "ranch_connection_max = 999", [{rabbit,[{ranch_connection_max, 999}]}], From 19be90624b7b0eb4993e0feeea60a2b2833c26f1 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 21 Oct 2025 16:04:09 -0400 Subject: [PATCH 07/10] rabbit_disk_monitor: Monitor per-queue-type mount points --- deps/rabbit/src/rabbit_disk_monitor.erl | 282 +++++++++++++++++++----- 1 file changed, 225 insertions(+), 57 deletions(-) diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 12f8ae29d29..41a83689c89 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -32,19 +32,32 @@ -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). +-define(MOUNT_POINT_ETS_NAME, rabbit_disk_monitor_per_mount_point). -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). -define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). -define(DEFAULT_DISK_FREE_LIMIT, 50000000). %% 250MB/s i.e. 250kB/ms -define(FAST_RATE, (250 * 1000)). +-record(mount_point, + {%% name set in configuration + id :: binary(), + %% minimum bytes available + limit :: non_neg_integer(), + %% detected available disk space in bytes + available = 'NaN' :: non_neg_integer() | 'NaN', + %% set of queue types which should be blocked if the limit is exceeded + queue_types :: sets:set(rabbit_queue_type:queue_type())}). + -record(state, { - %% monitor partition on which this directory resides + %% monitor partition on which the data directory resides dir, %% configured limit in bytes limit, %% last known free disk space amount in bytes actual, + %% extra file systems to monitor mapped to the queue types to + mount_points = #{} :: mount_points(), %% minimum check interval min_interval, %% maximum check interval @@ -67,6 +80,19 @@ -type disk_free_limit() :: integer() | {'absolute', integer()} | string() | {'mem_relative', float() | integer()}. +-type mount_points() :: #{file:filename_all() => #mount_point{}}. + +%%---------------------------------------------------------------------------- + +%% This needs to wait until the recovery phase so that queue types have a +%% chance to register themselves. +-rabbit_boot_step({monitor_mount_points, + [{description, "monitor per-queue-type mount points"}, + {mfa, {gen_server, call, + [?MODULE, monitor_mount_points]}}, + {requires, recovery}, + {enables, routing_ready}]}). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -112,12 +138,11 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([Limit]) -> - Dir = dir(), {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), - State0 = #state{dir = Dir, - alarmed = false, + ?MOUNT_POINT_ETS_NAME = ets:new(?MOUNT_POINT_ETS_NAME, [protected, set, named_table]), + State0 = #state{alarmed = false, enabled = true, limit = Limit, retries = Retries, @@ -166,6 +191,15 @@ handle_call({set_enabled, _Enabled = false}, _From, State = #state{enabled = fal ?LOG_INFO("Free disk space monitor was already disabled"), {reply, ok, State#state{enabled = false}}; +handle_call(monitor_mount_points, _From, State) -> + case State of + #state{enabled = true} -> + State1 = State#state{mount_points = mount_points()}, + {reply, ok, internal_update(State1)}; + #state{enabled = false} -> + {reply, ok, State} + end; + handle_call(_Request, _From, State) -> {noreply, State}. @@ -205,9 +239,6 @@ safe_ets_lookup(Key, Default) -> Default end. -% the partition / drive containing this directory will be monitored -dir() -> rabbit:data_dir(). - set_min_check_interval(MinInterval, State) -> ets:insert(?ETS_NAME, {min_check_interval, MinInterval}), State#state{min_interval = MinInterval}. @@ -224,36 +255,94 @@ set_disk_limits(State, Limit0) -> ets:insert(?ETS_NAME, {disk_free_limit, Limit}), internal_update(State1). -internal_update(State = #state{limit = Limit, - dir = Dir, - alarmed = Alarmed}) -> - CurrentFree = get_disk_free(Dir), +internal_update(#state{limit = DataDirLimit, + dir = Dir, + mount_points = MountPoints, + alarmed = Alarmed} = State) -> + DiskFree = get_disk_free(State), + DataDirFree = maps:get(Dir, DiskFree, 'NaN'), %% note: 'NaN' is considered to be less than a number - NewAlarmed = CurrentFree < Limit, + NewAlarmed = DataDirFree < DataDirLimit, case {Alarmed, NewAlarmed} of {false, true} -> - emit_update_info("insufficient", CurrentFree, Limit), + emit_update_info("insufficient", DataDirFree, DataDirLimit), rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> - emit_update_info("sufficient", CurrentFree, Limit), + emit_update_info("sufficient", DataDirFree, DataDirLimit), rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok end, - ets:insert(?ETS_NAME, {disk_free, CurrentFree}), - State#state{alarmed = NewAlarmed, actual = CurrentFree}. - --spec get_disk_free(file:filename_all()) -> - AvailableBytes :: non_neg_integer() | 'NaN'. -get_disk_free(Dir) -> - case disksup:get_disk_info(Dir) of - [{D, 0, 0, 0, 0}] when D =:= Dir orelse D =:= "none" -> - 'NaN'; - [{_MountPoint, _TotalKiB, AvailableKiB, _Capacity}] -> - AvailableKiB * 1024; - _DiskInfo -> - 'NaN' - end. + ets:insert(?ETS_NAME, {disk_free, DataDirFree}), + + NewMountPoints = maps:map( + fun(Path, MP) -> + Available = maps:get(Path, DiskFree, 'NaN'), + MP#mount_point{available = Available} + end, MountPoints), + ets:insert( + ?MOUNT_POINT_ETS_NAME, + [{Id, Available, Limit} + || _Path := #mount_point{id = Id, + available = Available, + limit = Limit} <- NewMountPoints]), + + AlarmedMPs = alarmed_mount_points(MountPoints), + NewAlarmedMPs = alarmed_mount_points(NewMountPoints), + + NewlyClearedMPs = sets:subtract(AlarmedMPs, NewAlarmedMPs), + NewlyAlarmedMPs = sets:subtract(NewAlarmedMPs, AlarmedMPs), + + lists:foreach( + fun(Path) -> + #mount_point{id = Id, + limit = Limit, + available = Available} = maps:get(Path, + NewMountPoints), + emit_update_info(Id, "insufficient", Available, Limit) + end, lists:sort(sets:to_list(NewlyAlarmedMPs))), + %% TODO: rabbit_alarm:set_alarm/1 for affected queue types + lists:foreach( + fun(Path) -> + #mount_point{id = Id, + limit = Limit, + available = Available} = maps:get(Path, + NewMountPoints), + emit_update_info(Id, "sufficient", Available, Limit) + end, lists:sort(sets:to_list(NewlyClearedMPs))), + %% TODO: rabbit_alarm:clear_alarm/1 for affected queue types + + State#state{alarmed = NewAlarmed, + actual = DataDirFree, + mount_points = NewMountPoints}. + +emit_update_info(StateStr, CurrentFree, Limit) -> + ?LOG_INFO( + "Free disk space is ~ts. Free bytes: ~b. Limit: ~b", + [StateStr, CurrentFree, Limit]). +emit_update_info(MountPoint, StateStr, CurrentFree, Limit) -> + ?LOG_INFO( + "Free space of disk '~ts' is ~ts. Free bytes: ~b. Limit: ~b", + [MountPoint, StateStr, CurrentFree, Limit]). + +-spec alarmed_mount_points(mount_points()) -> + sets:set(file:filename_all()). +alarmed_mount_points(MountPoints) -> + maps:fold( + fun (Path, #mount_point{available = Available, limit = Limit}, Acc) + when Available < Limit -> + sets:add_element(Path, Acc); + (_Path, _MP, Acc) -> + Acc + end, sets:new([{version, 2}]), MountPoints). + +-spec get_disk_free(#state{}) -> + #{file:filename_all() => AvailableB :: non_neg_integer()}. +get_disk_free(#state{dir = DataDir, mount_points = MountPoints}) -> + #{MountPoint => AvailableKiB * 1024 || + {MountPoint, Total, AvailableKiB, Capacity} <- disksup:get_disk_info(), + {Total, AvailableKiB, Capacity} =/= {0, 0, 0}, + MountPoint =:= DataDir orelse is_map_key(MountPoint, MountPoints)}. interpret_limit({mem_relative, Relative}) when is_number(Relative) -> @@ -269,11 +358,6 @@ interpret_limit(Absolute) -> ?DEFAULT_DISK_FREE_LIMIT end. -emit_update_info(StateStr, CurrentFree, Limit) -> - ?LOG_INFO( - "Free disk space is ~ts. Free bytes: ~b. Limit: ~b", - [StateStr, CurrentFree, Limit]). - start_timer(State) -> State#state{timer = erlang:send_after(interval(State), self(), update)}. @@ -290,30 +374,114 @@ interval(#state{limit = Limit, IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). +-spec mount_points() -> + #{MountPoint :: file:filename_all() => #mount_point{}}. +mount_points() -> + case application:get_env(rabbit, disk_free_limits) of + {ok, Limits} -> + maps:fold( + fun(Id, {Path, Limit0, QTypes}, Acc) -> + Res = rabbit_resource_monitor_misc:parse_information_unit( + Limit0), + case Res of + {ok, Limit} -> + {Known, Unknown} = resolve_queue_types(QTypes), + case Unknown of + [_ | _] -> + ?LOG_WARNING( + "Unknown queue types configured for " + "disk '~ts': ~ts", + [Id, lists:join(", ", Unknown)]), + ok; + _ -> + ok + end, + case Known of + [] -> + ?LOG_ERROR("No known queue types " + "configured for disk '~ts'. " + "The disk will not be " + "monitored for free " + "disk space.", [Id]), + Acc; + _ -> + QTs = sets:from_list(Known, + [{version, 2}]), + MP = #mount_point{id = Id, + limit = Limit, + queue_types = QTs}, + Acc#{Path => MP} + end; + {error, parse_error} -> + ?LOG_ERROR("Unable to parse free disk limit " + "'~ts' for disk '~ts'. The disk will " + "not be monitored for free space.", + [Limit0, Id]), + Acc + end + end, #{}, Limits); + undefined -> + #{} + end. + +resolve_queue_types(QTs) -> + resolve_queue_types(QTs, {[], []}). + +resolve_queue_types([], Acc) -> + Acc; +resolve_queue_types([QT | Rest], {Known, Unknown}) -> + case rabbit_registry:lookup_type_module(queue, QT) of + {ok, TypeModule} -> + resolve_queue_types(Rest, {[TypeModule | Known], Unknown}); + {error, not_found} -> + resolve_queue_types(Rest, {Known, [QT | Unknown]}) + end. + enable(#state{retries = 0} = State) -> ?LOG_ERROR("Free disk space monitor failed to start!"), State; -enable(#state{dir = Dir} = State) -> - enable_handle_disk_free(get_disk_free(Dir), State). - -enable_handle_disk_free(DiskFree, State) when is_integer(DiskFree) -> - enable_handle_total_memory(catch vm_memory_monitor:get_total_memory(), DiskFree, State); -enable_handle_disk_free(Error, #state{interval = Interval, retries = Retries} = State) -> - ?LOG_WARNING("Free disk space monitor encountered an error " - "(e.g. failed to parse output from OS tools). " - "Retries left: ~b Error:~n~tp", - [Retries, Error]), - erlang:send_after(Interval, self(), try_enable), - State#state{enabled = false}. - -enable_handle_total_memory(TotalMemory, DiskFree, #state{limit = Limit} = State) when is_integer(TotalMemory) -> - ?LOG_INFO("Enabling free disk space monitoring " - "(disk free space: ~b, total memory: ~b)", [DiskFree, TotalMemory]), - start_timer(set_disk_limits(State, Limit)); -enable_handle_total_memory(Error, _DiskFree, #state{interval = Interval, retries = Retries} = State) -> - ?LOG_WARNING("Free disk space monitor encountered an error " - "retrieving total memory. " - "Retries left: ~b Error:~n~tp", - [Retries, Error]), - erlang:send_after(Interval, self(), try_enable), - State#state{enabled = false}. +enable(#state{dir = undefined, + interval = Interval, + retries = Retries} = State) -> + case resolve_data_dir() of + {ok, MountPoint} -> + enable(State#state{dir = MountPoint}); + {error, Reason} -> + ?LOG_WARNING("Free disk space monitor encounter an error " + "resolving the data directory '~ts'. Retries left: " + "~b Error:~n~tp", + [rabbit:data_dir(), Retries, Reason]), + erlang:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end; +enable(#state{dir = Dir, + retries = Retries, + interval = Interval, + limit = Limit} = State) -> + DiskFree = get_disk_free(State), + case vm_memory_monitor:get_total_memory() of + TotalMemory when is_integer(TotalMemory) -> + ?LOG_INFO("Enabling free disk space monitoring (data dir free " + "space: ~b, total memory: ~b)", + [maps:get(Dir, DiskFree, unknown), TotalMemory]), + start_timer(set_disk_limits(State, Limit)); + unknown -> + ?LOG_WARNING("Free disk space monitor could not determine total " + "memory. Retries left: ~b", [Retries]), + erlang:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end. + +resolve_data_dir() -> + case disksup:get_disk_info(rabbit:data_dir()) of + [{"none", 0, 0, 0}] -> + {error, disksup_not_available}; + [{MountPoint, 0, 0, 0}] -> + {error, {cannot_determine_space, MountPoint}}; + [{MountPoint, _TotalKiB, _AvailableKiB, _Capacity}] -> + {ok, MountPoint}; + [] -> + {error, no_disk_info}; + [_ | _] = Infos -> + {error, {multiple_disks, length(Infos)}} + end. From 0e4f126fd1235d7e1955d262274330ec0f570829 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 23 Oct 2025 12:56:29 -0400 Subject: [PATCH 08/10] rabbit_alarm: Add a helper to format resource alarm sources --- deps/rabbit/src/rabbit_alarm.erl | 30 +++++++++++++++--------------- deps/rabbit/src/rabbit_reader.erl | 9 +++------ 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 84c5188f159..52cd78db9e1 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -24,6 +24,7 @@ -export([start_link/0, start/0, stop/0, register/2, set_alarm/1, clear_alarm/1, get_alarms/0, get_alarms/1, get_local_alarms/0, get_local_alarms/1, on_node_up/1, on_node_down/1, + format_resource_alarm_source/1, format_as_map/1, format_as_maps/1, is_local/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, @@ -124,25 +125,23 @@ is_local({file_descriptor_limit, _}) -> true; is_local({{resource_limit, _Resource, Node}, _}) when Node =:= node() -> true; is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false. +-spec format_resource_alarm_source(resource_alarm_source()) -> iodata(). +format_resource_alarm_source(disk) -> + ?DISK_SPACE_RESOURCE; +format_resource_alarm_source(memory) -> + ?MEMORY_RESOURCE; +format_resource_alarm_source(Unknown) -> + io_lib:format("~w", [Unknown]). + -spec format_as_map(alarm()) -> #{binary() => term()}. format_as_map(file_descriptor_limit) -> #{ <<"resource">> => ?FILE_DESCRIPTOR_RESOURCE, <<"node">> => node() }; -format_as_map({resource_limit, disk, Node}) -> - #{ - <<"resource">> => ?DISK_SPACE_RESOURCE, - <<"node">> => Node - }; -format_as_map({resource_limit, memory, Node}) -> - #{ - <<"resource">> => ?MEMORY_RESOURCE, - <<"node">> => Node - }; format_as_map({resource_limit, Limit, Node}) -> #{ - <<"resource">> => rabbit_data_coercion:to_binary(Limit), + <<"resource">> => iolist_to_binary(format_resource_alarm_source(Limit)), <<"node">> => Node }. @@ -237,7 +236,7 @@ handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> AlarmsForDeadNode = maps:get(Node, AN, []), {ok, lists:foldr(fun(Source, AccState) -> ?LOG_WARNING("~ts resource limit alarm cleared for dead node ~tp", - [Source, Node]), + [format_resource_alarm_source(Source), Node]), maybe_alert(fun map_unappend/3, Node, Source, false, AccState) end, State, AlarmsForDeadNode)}; @@ -283,7 +282,8 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, end, AN1), case StillHasAlerts of true -> ok; - false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", [Source]) + false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", + [format_resource_alarm_source(Source)]) end, Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of @@ -326,7 +326,7 @@ handle_set_resource_alarm(Source, Node, State) -> "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", - [Source, Node]), + [format_resource_alarm_source(Source), Node]), {ok, maybe_alert(fun map_append/3, Node, Source, true, State)}. handle_set_alarm({file_descriptor_limit, []}, State) -> @@ -342,7 +342,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_resource_alarm(Source, Node, State) -> ?LOG_WARNING("~ts resource limit alarm cleared on node ~tp", - [Source, Node]), + [format_resource_alarm_source(Source), Node]), {ok, maybe_alert(fun map_unappend/3, Node, Source, false, State)}. handle_clear_alarm(file_descriptor_limit, State) -> diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index fe3e74c7b92..d43a7f3a9fc 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -1727,14 +1727,11 @@ send_error_on_channel0_and_close(Channel, Protocol, Reason, State) -> blocked_by_message(#throttle{blocked_by = Reasons}) -> %% we don't want to report internal flow as a reason here since %% it is entirely transient - Reasons1 = sets:del_element(flow, Reasons), - RStr = string:join([format_blocked_by(R) || R <- sets:to_list(Reasons1)], " & "), + RStr = lists:join([rabbit_alarm:format_resource_alarm_source(R) || + {resource, R} <- sets:to_list(Reasons)], + " & "), list_to_binary(rabbit_misc:format("low on ~ts", [RStr])). -format_blocked_by({resource, memory}) -> "memory"; -format_blocked_by({resource, disk}) -> "disk"; -format_blocked_by({resource, disc}) -> "disk". - update_last_blocked_at(Throttle) -> Throttle#throttle{last_blocked_at = erlang:monotonic_time()}. From 4776581cd2796119f4a655fabd0a23d73b812c9d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 23 Oct 2025 16:50:58 -0400 Subject: [PATCH 09/10] Set per-queue-type disk alarms for configured mount points This introduces a new variant of `rabbit_alarm:resource_alarm_source()`: `{disk, QueueType}` which triggers when the configured mount point for queue type(s) fall under their limit of available space. --- deps/rabbit/src/rabbit_alarm.erl | 7 +++++- deps/rabbit/src/rabbit_disk_monitor.erl | 29 +++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 52cd78db9e1..f4af0dae2d7 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -49,7 +49,10 @@ -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. --type resource_alarm_source() :: 'disk' | 'memory'. +-type resource_alarm_source() :: + memory + | disk + | {disk, rabbit_queue_type:queue_type()}. -type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}. -type alarm() :: local_alarm() | resource_alarm(). -type resource_alert() :: {WasAlarmSetForNode :: boolean(), @@ -128,6 +131,8 @@ is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false. -spec format_resource_alarm_source(resource_alarm_source()) -> iodata(). format_resource_alarm_source(disk) -> ?DISK_SPACE_RESOURCE; +format_resource_alarm_source({disk, QueueType}) -> + io_lib:format("disk for queue type '~ts'", [QueueType]); format_resource_alarm_source(memory) -> ?MEMORY_RESOURCE; format_resource_alarm_source(Unknown) -> diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 41a83689c89..1b425de6255 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -288,10 +288,14 @@ internal_update(#state{limit = DataDirLimit, limit = Limit} <- NewMountPoints]), AlarmedMPs = alarmed_mount_points(MountPoints), + AlarmedQTs = alarmed_queue_types(MountPoints), NewAlarmedMPs = alarmed_mount_points(NewMountPoints), + NewAlarmedQTs = alarmed_queue_types(NewMountPoints), NewlyClearedMPs = sets:subtract(AlarmedMPs, NewAlarmedMPs), + NewlyClearedQTs = sets:subtract(AlarmedQTs, NewAlarmedQTs), NewlyAlarmedMPs = sets:subtract(NewAlarmedMPs, AlarmedMPs), + NewlyAlarmedQTs = sets:subtract(NewAlarmedQTs, AlarmedQTs), lists:foreach( fun(Path) -> @@ -301,7 +305,11 @@ internal_update(#state{limit = DataDirLimit, NewMountPoints), emit_update_info(Id, "insufficient", Available, Limit) end, lists:sort(sets:to_list(NewlyAlarmedMPs))), - %% TODO: rabbit_alarm:set_alarm/1 for affected queue types + lists:foreach( + fun(QT) -> + Alarm = {resource_limit, {disk, QT}, node()}, + rabbit_alarm:set_alarm({Alarm, []}) + end, lists:sort(sets:to_list(NewlyAlarmedQTs))), lists:foreach( fun(Path) -> #mount_point{id = Id, @@ -310,7 +318,11 @@ internal_update(#state{limit = DataDirLimit, NewMountPoints), emit_update_info(Id, "sufficient", Available, Limit) end, lists:sort(sets:to_list(NewlyClearedMPs))), - %% TODO: rabbit_alarm:clear_alarm/1 for affected queue types + lists:foreach( + fun(QT) -> + Alarm = {resource_limit, {disk, QT}, node()}, + rabbit_alarm:clear_alarm(Alarm) + end, lists:sort(sets:to_list(NewlyClearedQTs))), State#state{alarmed = NewAlarmed, actual = DataDirFree, @@ -336,6 +348,19 @@ alarmed_mount_points(MountPoints) -> Acc end, sets:new([{version, 2}]), MountPoints). +-spec alarmed_queue_types(mount_points()) -> + sets:set(module()). +alarmed_queue_types(MountPoints) -> + maps:fold( + fun (_Path, #mount_point{available = Available, + limit = Limit, + queue_types = QTs}, Acc) + when Available < Limit -> + sets:union(QTs, Acc); + (_Path, _MP, Acc) -> + Acc + end, sets:new([{version, 2}]), MountPoints). + -spec get_disk_free(#state{}) -> #{file:filename_all() => AvailableB :: non_neg_integer()}. get_disk_free(#state{dir = DataDir, mount_points = MountPoints}) -> From c96fb3aee48b7acb6849eff55633568c46f03ce7 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 23 Oct 2025 17:53:33 -0400 Subject: [PATCH 10/10] AMQP 0-9-1: Handle per-queue-type disk alarms This covers both network and direct connections for 0-9-1. We store a set of the queue types which have been published into on both a channel and connection level since blocking is done on the connection level but only the channel knows what queue types have been published. Then when the published queue types or the set of alarms changes, the connection evaluates whether it is affected by the alarm. If not it may publish but once a channel publishes to an alarmed queue type the connection then blocks until the channel exits or the alarm clears. --- deps/amqp_client/src/amqp_gen_connection.erl | 26 ++++++- deps/rabbit/src/rabbit_channel.erl | 28 +++++++- deps/rabbit/src/rabbit_reader.erl | 73 ++++++++++++++++---- 3 files changed, 107 insertions(+), 20 deletions(-) diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 886a06d45f0..7acf1f8210f 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -32,6 +32,7 @@ %% connection.block, connection.unblock handler block_handler, blocked_by = sets:new([{version, 2}]), + queue_types_published = sets:new([{version, 2}]), closing = false %% #closing{} | false }). @@ -214,7 +215,7 @@ handle_cast({register_blocked_handler, HandlerPid}, {noreply, State1}; handle_cast({conserve_resources, Source, Conserve}, #state{blocked_by = BlockedBy} = State) -> - WasNotBlocked = sets:is_empty(BlockedBy), + WasBlocked = should_block(State), BlockedBy1 = case Conserve of true -> sets:add_element(Source, BlockedBy); @@ -222,10 +223,22 @@ handle_cast({conserve_resources, Source, Conserve}, sets:del_element(Source, BlockedBy) end, State1 = State#state{blocked_by = BlockedBy1}, - case sets:is_empty(BlockedBy1) of + case should_block(State1) of true -> handle_method(#'connection.unblocked'{}, State1); - false when WasNotBlocked -> + false when not WasBlocked -> + handle_method(#'connection.blocked'{}, State1); + false -> + {noreply, State1} + end; +handle_cast({channel_published_to_queue_type, _ChPid, QT}, + #state{queue_types_published = QTs} = State) -> + WasBlocked = should_block(State), + State1 = State#state{queue_types_published = sets:add_element(QT, QTs)}, + case should_block(State1) of + true -> + handle_method(#'connection.unblocked'{}, State1); + false when not WasBlocked -> handle_method(#'connection.blocked'{}, State1); false -> {noreply, State1} @@ -274,6 +287,13 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState). register_blocked_handler(Pid, HandlerPid) -> gen_server:cast(Pid, {register_blocked_handler, HandlerPid}). +should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) -> + lists:any(fun ({disk, QT}) -> + sets:is_element(QT, QTs); + (_Resource) -> + true + end, sets:to_list(BlockedBy)). + %%--------------------------------------------------------------------------- %% Command handling %%--------------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 3c7c865fcb0..162b1d7c520 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -170,7 +170,9 @@ interceptor_state, queue_states, tick_timer, - publishing_mode = false :: boolean() + publishing_mode = false :: boolean(), + queue_types_published = sets:new([{version, 2}]) :: + sets:set(rabbit_queue_type:queue_type()) }). -define(QUEUE, lqueue). @@ -2097,9 +2099,10 @@ deliver_to_queues(XName, ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0), MsgSeqNo = maps:get(correlation, Options, undefined), State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0), + State2 = notify_published_queue_types(Qs, State1), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes - State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}), + State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}), case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), @@ -2165,6 +2168,27 @@ process_routing_confirm(MsgSeqNo, QRefs, XName, State) State#ch{unconfirmed = rabbit_confirms:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}. +notify_published_queue_types(Qs, #ch{cfg = #conf{conn_pid = ConnPid}, + queue_types_published = QTs0} = State0) -> + QTs = lists:foldl( + fun(Q0, Acc) -> + Q = case Q0 of + {Q1, _RouteInfo} -> Q1; + _ -> Q0 + end, + QT = amqqueue:get_type(Q), + case sets:is_element(QT, Acc) of + true -> + Acc; + false -> + gen_server:cast( + ConnPid, + {channel_published_to_queue_type, self(), QT}), + sets:add_element(QT, Acc) + end + end, QTs0, Qs), + State0#ch{queue_types_published = QTs}. + confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> %% NOTE: if queue name does not exist here it's likely that the ref also %% does not exist in unconfirmed messages. diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index d43a7f3a9fc..0f5c517fd76 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -113,7 +113,14 @@ %% a set of the reasons why we are %% blocked: {resource, memory}, {resource, disk}. %% More reasons can be added in the future. - blocked_by, + blocked_by = sets:new([{version, 2}]) :: + sets:set(flow | {resource, + rabbit_alarm:resource_alarm_source()}), + %% the set of queue types which have been published to + %% by channels on this connection, used for per-queue + %% type disk alarm blocking + queue_types_published = #{} :: #{ChannelPid :: pid() => + sets:set(rabbit_queue_type:queue_type())}, %% true if received any publishes, false otherwise %% note that this will also be true when connection is %% already blocked @@ -335,7 +342,6 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> throttle = #throttle{ last_blocked_at = never, should_block = false, - blocked_by = sets:new([{version, 2}]), connection_blocked_message_sent = false }, proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)}, @@ -677,6 +683,14 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; +handle_other({'$gen_cast', {channel_published_to_queue_type, ChPid, QT}}, + #v1{throttle = Throttle0} = State0) -> + QTs = maps:update_with( + ChPid, fun(ChQTs) -> sets:add_element(QT, ChQTs) end, + sets:from_list([QT], [{version, 2}]), + Throttle0#throttle.queue_types_published), + Throttle = Throttle0#throttle{queue_types_published = QTs}, + control_throttle(State0#v1{throttle = Throttle}); handle_other(ensure_stats, State) -> ensure_stats_timer(State); handle_other(emit_stats, State) -> @@ -1007,14 +1021,21 @@ is_over_node_channel_limit() -> end end. -channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> +channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount, + throttle = Throttle0} = State) -> case get({ch_pid, ChPid}) of - undefined -> {undefined, State}; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - {Channel, State#v1{channel_count = ChannelCount - 1}} + undefined -> + {undefined, State}; + {Channel, MRef} -> + credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + QT = maps:remove(ChPid, + Throttle0#throttle.queue_types_published), + Throttle = Throttle0#throttle{queue_types_published = QT}, + {Channel, State#v1{channel_count = ChannelCount - 1, + throttle = Throttle}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -1738,22 +1759,44 @@ update_last_blocked_at(Throttle) -> connection_blocked_message_sent( #throttle{connection_blocked_message_sent = BS}) -> BS. -should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_blocked(Throttle) -> should_block(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) =/= 0 + do_throttle_reasons_apply(Throttle) andalso not connection_blocked_message_sent(Throttle). -should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_unblocked(Throttle) -> connection_blocked_message_sent(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) == 0. + not do_throttle_reasons_apply(Throttle). + +do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, disk}) -> + true; + ({resource, memory}) -> + true; + ({resource, {disk, QT}}) -> + has_published_to_queue_type(QT, Throttle); + (_) -> + %% Flow control should not send connection.blocked + false + end, sets:to_list(Reasons)). + +has_published_to_queue_type(QT, #throttle{queue_types_published = QTs}) -> + rabbit_misc:maps_any( + fun(_ChPid, ChQT) -> sets:is_element(QT, ChQT) end, QTs). %% Returns true if we have a reason to block %% this connection. -has_reasons_to_block(#throttle{blocked_by = Reasons}) -> - sets:size(Reasons) > 0. +has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, {disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + true + end, sets:to_list(Reasons)). is_blocked_by_flow(#throttle{blocked_by = Reasons}) -> sets:is_element(flow, Reasons).