diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 886a06d45f05..7acf1f8210fa 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -32,6 +32,7 @@ %% connection.block, connection.unblock handler block_handler, blocked_by = sets:new([{version, 2}]), + queue_types_published = sets:new([{version, 2}]), closing = false %% #closing{} | false }). @@ -214,7 +215,7 @@ handle_cast({register_blocked_handler, HandlerPid}, {noreply, State1}; handle_cast({conserve_resources, Source, Conserve}, #state{blocked_by = BlockedBy} = State) -> - WasNotBlocked = sets:is_empty(BlockedBy), + WasBlocked = should_block(State), BlockedBy1 = case Conserve of true -> sets:add_element(Source, BlockedBy); @@ -222,10 +223,22 @@ handle_cast({conserve_resources, Source, Conserve}, sets:del_element(Source, BlockedBy) end, State1 = State#state{blocked_by = BlockedBy1}, - case sets:is_empty(BlockedBy1) of + case should_block(State1) of true -> handle_method(#'connection.unblocked'{}, State1); - false when WasNotBlocked -> + false when not WasBlocked -> + handle_method(#'connection.blocked'{}, State1); + false -> + {noreply, State1} + end; +handle_cast({channel_published_to_queue_type, _ChPid, QT}, + #state{queue_types_published = QTs} = State) -> + WasBlocked = should_block(State), + State1 = State#state{queue_types_published = sets:add_element(QT, QTs)}, + case should_block(State1) of + true -> + handle_method(#'connection.unblocked'{}, State1); + false when not WasBlocked -> handle_method(#'connection.blocked'{}, State1); false -> {noreply, State1} @@ -274,6 +287,13 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState). register_blocked_handler(Pid, HandlerPid) -> gen_server:cast(Pid, {register_blocked_handler, HandlerPid}). +should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) -> + lists:any(fun ({disk, QT}) -> + sets:is_element(QT, QTs); + (_Resource) -> + true + end, sets:to_list(BlockedBy)). + %%--------------------------------------------------------------------------- %% Command handling %%--------------------------------------------------------------------------- diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 1b4d43593ceb..09c1a9e5872d 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1238,6 +1238,87 @@ fun(Conf) -> end end}. +%% Tuning of disk monitor polling parameters +{mapping, "disk_monitor.fast_rate", "rabbit.disk_monitor_fast_rate", [ + %% Unit: KB/second, for example 250_000 for 250MB/sec. + {datatype, [integer]} +]}. +{mapping, "disk_monitor.min_interval", "rabbit.disk_monitor_min_interval", [ + %% Unit: milliseconds. + {datatype, [integer]} +]}. +{mapping, "disk_monitor.max_interval", "rabbit.disk_monitor_max_interval", [ + %% Unit: milliseconds. + {datatype, [integer]} +]}. + +%% Per-queue-type / per-mount disk alarms +{mapping, "disk_free_limits.$num.name", "rabbit.disk_free_limits", [ + {datatype, [binary]} +]}. +{mapping, "disk_free_limits.$num.mount", "rabbit.disk_free_limits", [ + {datatype, [string]} +]}. +{mapping, "disk_free_limits.$num.limit", "rabbit.disk_free_limits", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. +{mapping, "disk_free_limits.$num.queue_types", "rabbit.disk_free_limits", [ + {datatype, [binary]} +]}. + +{translation, "rabbit.disk_free_limits", +fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("disk_free_limits", Conf) of + [] -> + cuttlefish:unset(); + Settings -> + Ls = lists:foldl( + fun ({["disk_free_limits", Num, Key0], Value0}, Acc) -> + Idx = case string:to_integer(Num) of + {N, []} -> N; + _ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p could not be parsed as a number", [Num]))) + end, + Key = case Key0 of + "name" -> name; + "mount" -> mount; + "limit" -> limit; + "queue_types" -> queue_types; + _ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p is invalid", [Key0]))) + end, + Value = case Key of + queue_types -> string:split(Value0, ","); + _ -> Value0 + end, + maps:update_with( + Idx, + fun (#{Key := ExistingValue} = Limit) -> + cuttlefish:warn( + io_lib:format("Disk limit ~b has duplicate setting ~ts, " + "using ~tp instead of ~tp", + [Idx, Key, Value, ExistingValue])), + Limit#{Key := Value}; + (Limit) -> + Limit#{Key => Value} + end, #{Key => Value}, Acc); + (Other, _Acc) -> + cuttlefish:invalid( + lists:flatten(io_lib:format("~p is invalid", [Other]))) + end, #{}, Settings), + maps:fold( + fun(_Idx, #{name := Name}, Names) -> + case sets:is_element(Name, Names) of + true -> + cuttlefish:invalid( + lists:flatten(io_lib:format("name ~ts is used by multiple mounts", [Name]))); + false -> + sets:add_element(Name, Names) + end + end, sets:new([{version, 2}]), Ls), + Ls + end +end}. + %% %% Clustering %% ===================== @@ -2620,6 +2701,20 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% Classic queue data directory +{mapping, "classic_queue.data_dir", "rabbit.classic_queue_data_dir", [ + {datatype, string} +]}. + +{translation, "rabbit.classic_queue_data_dir", + fun(Conf) -> + case cuttlefish:conf_get("classic_queue.data_dir", Conf, undefined) of + undefined -> cuttlefish:unset(); + Val -> Val + end + end +}. + %% %% Backing queue version %% @@ -2776,6 +2871,19 @@ fun(Conf) -> end end}. +{mapping, "stream.data_dir", "osiris.data_dir", [ + {datatype, string} +]}. + +{translation, "osiris.data_dir", + fun(Conf) -> + case cuttlefish:conf_get("stream.data_dir", Conf, undefined) of + undefined -> cuttlefish:unset(); + Val -> Val + end + end +}. + {mapping, "stream.read_ahead", "rabbit.stream_read_ahead", [{datatype, {enum, [true, false]}}]}. diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 879fdae81896..f4af0dae2d78 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -24,6 +24,7 @@ -export([start_link/0, start/0, stop/0, register/2, set_alarm/1, clear_alarm/1, get_alarms/0, get_alarms/1, get_local_alarms/0, get_local_alarms/1, on_node_up/1, on_node_down/1, + format_resource_alarm_source/1, format_as_map/1, format_as_maps/1, is_local/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, @@ -42,13 +43,16 @@ %%---------------------------------------------------------------------------- --record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()), - alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]), - alarms :: [alarm()]}). +-record(alarms, {alertees = #{} :: #{pid() => rabbit_types:mfargs()}, + alarmed_nodes = #{} :: #{node() => [resource_alarm_source()]}, + alarms = [] :: [alarm()]}). -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. --type resource_alarm_source() :: 'disk' | 'memory'. +-type resource_alarm_source() :: + memory + | disk + | {disk, rabbit_queue_type:queue_type()}. -type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}. -type alarm() :: local_alarm() | resource_alarm(). -type resource_alert() :: {WasAlarmSetForNode :: boolean(), @@ -90,7 +94,7 @@ stop() -> ok. %% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source' %% has the type of resource_alarm_source() and `Alert' has the type of resource_alert(). --spec register(pid(), rabbit_types:mfargs()) -> [atom()]. +-spec register(pid(), rabbit_types:mfargs()) -> [resource_alarm_source()]. register(Pid, AlertMFA) -> gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). @@ -124,25 +128,25 @@ is_local({file_descriptor_limit, _}) -> true; is_local({{resource_limit, _Resource, Node}, _}) when Node =:= node() -> true; is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false. +-spec format_resource_alarm_source(resource_alarm_source()) -> iodata(). +format_resource_alarm_source(disk) -> + ?DISK_SPACE_RESOURCE; +format_resource_alarm_source({disk, QueueType}) -> + io_lib:format("disk for queue type '~ts'", [QueueType]); +format_resource_alarm_source(memory) -> + ?MEMORY_RESOURCE; +format_resource_alarm_source(Unknown) -> + io_lib:format("~w", [Unknown]). + -spec format_as_map(alarm()) -> #{binary() => term()}. format_as_map(file_descriptor_limit) -> #{ <<"resource">> => ?FILE_DESCRIPTOR_RESOURCE, <<"node">> => node() }; -format_as_map({resource_limit, disk, Node}) -> - #{ - <<"resource">> => ?DISK_SPACE_RESOURCE, - <<"node">> => Node - }; -format_as_map({resource_limit, memory, Node}) -> - #{ - <<"resource">> => ?MEMORY_RESOURCE, - <<"node">> => Node - }; format_as_map({resource_limit, Limit, Node}) -> #{ - <<"resource">> => rabbit_data_coercion:to_binary(Limit), + <<"resource">> => iolist_to_binary(format_resource_alarm_source(Limit)), <<"node">> => Node }. @@ -177,12 +181,10 @@ remote_conserve_resources(Pid, Source, {false, _, _}) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{alertees = dict:new(), - alarmed_nodes = dict:new(), - alarms = []}}. + {ok, #alarms{}}. handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> - {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), + {ok, lists:usort(lists:append(maps:values(AN))), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State) -> @@ -236,14 +238,11 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> - AlarmsForDeadNode = case dict:find(Node, AN) of - {ok, V} -> V; - error -> [] - end, + AlarmsForDeadNode = maps:get(Node, AN, []), {ok, lists:foldr(fun(Source, AccState) -> ?LOG_WARNING("~ts resource limit alarm cleared for dead node ~tp", - [Source, Node]), - maybe_alert(fun dict_unappend/3, Node, Source, false, AccState) + [format_resource_alarm_source(Source), Node]), + maybe_alert(fun map_unappend/3, Node, Source, false, AccState) end, State, AlarmsForDeadNode)}; handle_event({register, Pid, AlertMFA}, State) -> @@ -254,7 +253,7 @@ handle_event(_Event, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #alarms{alertees = Alertees}) -> - {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}}; + {ok, State#alarms{alertees = maps:remove(Pid, Alertees)}}; handle_info(_Info, State) -> {ok, State}. @@ -267,22 +266,14 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -dict_append(Key, Val, Dict) -> - L = case dict:find(Key, Dict) of - {ok, V} -> V; - error -> [] - end, - dict:store(Key, lists:usort([Val|L]), Dict). - -dict_unappend(Key, Val, Dict) -> - L = case dict:find(Key, Dict) of - {ok, V} -> V; - error -> [] - end, +map_append(Key, Val, Map) -> + maps:update_with(Key, fun(Vs) -> [Val | Vs] end, [Val], Map). +map_unappend(Key, Val, Map) -> + L = maps:get(Key, Map, []), case lists:delete(Val, L) of - [] -> dict:erase(Key, Dict); - X -> dict:store(Key, X, Dict) + [] -> maps:remove(Key, Map); + X -> Map#{Key := X} end. maybe_alert(UpdateFun, Node, Source, WasAlertAdded, @@ -290,10 +281,14 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), %% Is alarm for Source still set on any node? - StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), + StillHasAlerts = rabbit_misc:maps_any( + fun(_Node, NodeAlerts) -> + lists:member(Source, NodeAlerts) + end, AN1), case StillHasAlerts of true -> ok; - false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", [Source]) + false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", + [format_resource_alarm_source(Source)]) end, Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of @@ -311,22 +306,24 @@ alert_remote(Alert, Alertees, Source) -> alert(Alertees, Source, Alert, NodeComparator) -> Node = node(), - dict:fold(fun (Pid, {M, F, A}, ok) -> - case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Source, Alert]); - false -> ok - end - end, ok, Alertees). + maps:foreach(fun (Pid, {M, F, A}) -> + case NodeComparator(Node, node(Pid)) of + true -> apply(M, F, A ++ [Pid, Source, Alert]); + false -> ok + end + end, Alertees). internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - _ = case dict:find(node(), State#alarms.alarmed_nodes) of - {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; - error -> ok + Node = node(), + _ = case State#alarms.alarmed_nodes of + #{Node := Sources} -> + [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; + _ -> + ok end, - NewAlertees = dict:store(Pid, AlertMFA, Alertees), - State#alarms{alertees = NewAlertees}. + State#alarms{alertees = Alertees#{Pid => AlertMFA}}. handle_set_resource_alarm(Source, Node, State) -> ?LOG_WARNING( @@ -334,8 +331,8 @@ handle_set_resource_alarm(Source, Node, State) -> "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", - [Source, Node]), - {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}. + [format_resource_alarm_source(Source), Node]), + {ok, maybe_alert(fun map_append/3, Node, Source, true, State)}. handle_set_alarm({file_descriptor_limit, []}, State) -> ?LOG_WARNING( @@ -350,8 +347,8 @@ handle_set_alarm(Alarm, State) -> handle_clear_resource_alarm(Source, Node, State) -> ?LOG_WARNING("~ts resource limit alarm cleared on node ~tp", - [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. + [format_resource_alarm_source(Source), Node]), + {ok, maybe_alert(fun map_unappend/3, Node, Source, false, State)}. handle_clear_alarm(file_descriptor_limit, State) -> ?LOG_WARNING("file descriptor limit alarm cleared~n"), @@ -361,14 +358,14 @@ handle_clear_alarm(Alarm, State) -> {ok, State}. is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) -> - case dict:find(Node, AN) of - {ok, Sources} -> + case AN of + #{Node := Sources} -> lists:member(Source, Sources); - error -> + _ -> false end. compute_alarms(#alarms{alarms = Alarms, alarmed_nodes = AN}) -> Alarms ++ [ {{resource_limit, Source, Node}, []} - || {Node, Sources} <- dict:to_list(AN), Source <- Sources ]. + || Node := Sources <- AN, Source <- Sources ]. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 3c7c865fcb00..162b1d7c520d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -170,7 +170,9 @@ interceptor_state, queue_states, tick_timer, - publishing_mode = false :: boolean() + publishing_mode = false :: boolean(), + queue_types_published = sets:new([{version, 2}]) :: + sets:set(rabbit_queue_type:queue_type()) }). -define(QUEUE, lqueue). @@ -2097,9 +2099,10 @@ deliver_to_queues(XName, ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0), MsgSeqNo = maps:get(correlation, Options, undefined), State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0), + State2 = notify_published_queue_types(Qs, State1), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes - State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}), + State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}), case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), @@ -2165,6 +2168,27 @@ process_routing_confirm(MsgSeqNo, QRefs, XName, State) State#ch{unconfirmed = rabbit_confirms:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}. +notify_published_queue_types(Qs, #ch{cfg = #conf{conn_pid = ConnPid}, + queue_types_published = QTs0} = State0) -> + QTs = lists:foldl( + fun(Q0, Acc) -> + Q = case Q0 of + {Q1, _RouteInfo} -> Q1; + _ -> Q0 + end, + QT = amqqueue:get_type(Q), + case sets:is_element(QT, Acc) of + true -> + Acc; + false -> + gen_server:cast( + ConnPid, + {channel_published_to_queue_type, self(), QT}), + sets:add_element(QT, Acc) + end + end, QTs0, Qs), + State0#ch{queue_types_published = QTs}. + confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> %% NOTE: if queue name does not exist here it's likely that the ref also %% does not exist in unconfirmed messages. diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 292bce853d79..9d78e953c4e6 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -15,11 +15,6 @@ %% watermark (configurable either as an absolute value or %% relative to the memory limit). %% -%% Disk monitoring is done by shelling out to /usr/bin/df -%% instead of related built-in OTP functions because currently -%% this is the most reliable way of determining free disk space -%% for the partition our internal database is on. -%% %% Update interval is dynamically calculated assuming disk %% space is being filled at FAST_RATE. @@ -31,25 +26,39 @@ terminate/2, code_change/3]). -export([get_disk_free_limit/0, set_disk_free_limit/1, + set_disk_free_limit/2, get_min_check_interval/0, set_min_check_interval/1, get_max_check_interval/0, set_max_check_interval/1, - get_disk_free/0, set_enabled/1]). + get_disk_free/0, get_mount_free/0, set_enabled/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). +-define(MOUNT_ETS_NAME, rabbit_disk_monitor_per_mount). -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). -define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). -define(DEFAULT_DISK_FREE_LIMIT, 50000000). -%% 250MB/s i.e. 250kB/ms --define(FAST_RATE, (250 * 1000)). + +-record(mount, + {%% name set in configuration + name :: binary(), + %% number set in configuration, used to order the disks in the UI + precedence :: integer(), + %% minimum bytes available + limit :: non_neg_integer(), + %% detected available disk space in bytes + available = 'NaN' :: non_neg_integer() | 'NaN', + %% set of queue types which should be blocked if the limit is exceeded + queue_types :: sets:set(rabbit_queue_type:queue_type())}). -record(state, { - %% monitor partition on which this directory resides + %% monitor partition on which the data directory resides dir, %% configured limit in bytes limit, %% last known free disk space amount in bytes actual, + %% extra file systems to monitor mapped to the queue types to + mounts = #{} :: mounts(), %% minimum check interval min_interval, %% maximum check interval @@ -65,17 +74,26 @@ %% on start-up retries, %% Interval between retries - interval, - %% Operating system in use - os, - %% Port running sh to execute df commands - port + interval }). %%---------------------------------------------------------------------------- -type disk_free_limit() :: integer() | {'absolute', integer()} | string() | {'mem_relative', float() | integer()}. +-type mounts() :: #{file:filename() => #mount{}}. + +%%---------------------------------------------------------------------------- + +%% This needs to wait until the recovery phase so that queue types have a +%% chance to register themselves. +-rabbit_boot_step({monitor_mounts, + [{description, "monitor per-queue-type mounts"}, + {mfa, {gen_server, call, + [?MODULE, monitor_mounts]}}, + {requires, recovery}, + {enables, routing_ready}]}). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -88,6 +106,11 @@ get_disk_free_limit() -> set_disk_free_limit(Limit) -> gen_server:call(?MODULE, {set_disk_free_limit, Limit}). +-spec set_disk_free_limit(MountName :: binary(), integer()) -> 'ok'. +set_disk_free_limit(MountName, Limit) + when is_binary(MountName) andalso is_integer(Limit) -> + gen_server:call(?MODULE, {set_disk_free_limit, MountName, Limit}). + -spec get_min_check_interval() -> integer(). get_min_check_interval() -> safe_ets_lookup(min_check_interval, ?DEFAULT_MIN_DISK_CHECK_INTERVAL). @@ -108,6 +131,28 @@ set_max_check_interval(Interval) -> get_disk_free() -> safe_ets_lookup(disk_free, 'NaN'). +-spec get_mount_free() -> + [#{name := binary(), + available := non_neg_integer() | 'NaN', + limit := pos_integer()}]. +get_mount_free() -> + Ms0 = try + ets:tab2list(?MOUNT_ETS_NAME) + catch + error:badarg -> + [] + end, + Ms = lists:sort( + fun(#mount{precedence = A}, #mount{precedence = B}) -> + %% ascending + A < B + end, Ms0), + [#{name => Name, + available => Available, + limit => Limit} || #mount{name = Name, + available = Available, + limit = Limit} <- Ms]. + -spec set_enabled(string()) -> 'ok'. set_enabled(Enabled) -> gen_server:call(?MODULE, {set_enabled, Enabled}). @@ -121,31 +166,26 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([Limit]) -> - Dir = dir(), {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), + MinInterval = application:get_env(rabbit, disk_monitor_min_interval, + ?DEFAULT_MIN_DISK_CHECK_INTERVAL), + MaxInterval = application:get_env(rabbit, disk_monitor_max_interval, + ?DEFAULT_MAX_DISK_CHECK_INTERVAL), ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), - State0 = #state{dir = Dir, - alarmed = false, + ?MOUNT_ETS_NAME = ets:new(?MOUNT_ETS_NAME, [protected, set, named_table, + {keypos, #mount.name}]), + State0 = #state{alarmed = false, enabled = true, limit = Limit, retries = Retries, interval = Interval}, - State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0), - State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1), + State1 = set_min_check_interval(MinInterval, State0), + State2 = set_max_check_interval(MaxInterval, State1), - OS = os:type(), - Port = case OS of - {unix, _} -> - start_portprogram(); - {win32, _OSname} -> - not_used - end, - State3 = State2#state{port=Port, os=OS}, + State3 = enable(State2), - State4 = enable(State3), - - {ok, State4}. + {ok, State3}. handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> ?LOG_INFO("Cannot set disk free limit: " @@ -155,6 +195,23 @@ handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; +handle_call({set_disk_free_limit, Name, Limit}, _From, + #state{mounts = Mounts0} = State) -> + MatchingMount = lists:search( + fun({_Path, #mount{name = N}}) -> + Name =:= N + end, maps:to_list(Mounts0)), + case MatchingMount of + {value, {Path, Mount}} -> + ?LOG_INFO("Updated disk free limit of mount '~ts'", [Name]), + Mounts = Mounts0#{Path := Mount#mount{limit = Limit}}, + {reply, ok, State#state{mounts = Mounts}}; + false -> + ?LOG_WARNING("Cannot set disk free limit for mount '~ts' since " + "the name does not match any known mounts.", [Name]), + {reply, ok, State} + end; + handle_call(get_max_check_interval, _From, State) -> {reply, State#state.max_interval, State}; @@ -184,6 +241,15 @@ handle_call({set_enabled, _Enabled = false}, _From, State = #state{enabled = fal ?LOG_INFO("Free disk space monitor was already disabled"), {reply, ok, State#state{enabled = false}}; +handle_call(monitor_mounts, _From, State) -> + case State of + #state{enabled = true} -> + State1 = State#state{mounts = mounts()}, + {reply, ok, internal_update(State1)}; + #state{enabled = false} -> + {reply, ok, State} + end; + handle_call(_Request, _From, State) -> {noreply, State}. @@ -210,43 +276,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- -start_portprogram() -> - Args = ["-s", "rabbit_disk_monitor"], - Opts = [stream, stderr_to_stdout, {args, Args}], - erlang:open_port({spawn_executable, "/bin/sh"}, Opts). - -run_port_cmd(Cmd0, Port) -> - %% Insert a carriage return, ^M or ASCII 13, after the command, - %% to indicate end of output - Cmd1 = io_lib:format("~ts < /dev/null; echo \"\^M\"~n", [Cmd0]), - Cmd2 = rabbit_data_coercion:to_utf8_binary(Cmd1), - Port ! {self(), {command, [Cmd2, 10]}}, % The 10 at the end is a newline - get_reply(Port, []). - -get_reply(Port, O) -> - receive - {Port, {data, N}} -> - case newline(N, O) of - {ok, Str} -> - Str; - {more, Acc} -> - get_reply(Port, Acc) - end; - {'EXIT', Port, Reason} -> - exit({port_died, Reason}) - end. - -% Character 13 is ^M or carriage return -newline([13|_], B) -> - {ok, lists:reverse(B)}; -newline([H|T], B) -> - newline(T, [H|B]); -newline([], B) -> - {more, B}. - -find_cmd(Cmd) -> - os:find_executable(Cmd). - safe_ets_lookup(Key, Default) -> try case ets:lookup(?ETS_NAME, Key) of @@ -260,9 +289,6 @@ safe_ets_lookup(Key, Default) -> Default end. -% the partition / drive containing this directory will be monitored -dir() -> rabbit:data_dir(). - set_min_check_interval(MinInterval, State) -> ets:insert(?ETS_NAME, {min_check_interval, MinInterval}), State#state{min_interval = MinInterval}. @@ -279,125 +305,110 @@ set_disk_limits(State, Limit0) -> ets:insert(?ETS_NAME, {disk_free_limit, Limit}), internal_update(State1). -internal_update(State = #state{limit = Limit, - dir = Dir, - alarmed = Alarmed, - os = OS, - port = Port}) -> - CurrentFree = get_disk_free(Dir, OS, Port), +internal_update(#state{limit = DataDirLimit, + dir = Dir, + mounts = Mounts, + alarmed = Alarmed} = State) -> + DiskFree = get_disk_free(State), + DataDirFree = maps:get(Dir, DiskFree, 'NaN'), %% note: 'NaN' is considered to be less than a number - NewAlarmed = CurrentFree < Limit, + NewAlarmed = DataDirFree < DataDirLimit, case {Alarmed, NewAlarmed} of {false, true} -> - emit_update_info("insufficient", CurrentFree, Limit), + emit_update_info("insufficient", DataDirFree, DataDirLimit), rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> - emit_update_info("sufficient", CurrentFree, Limit), + emit_update_info("sufficient", DataDirFree, DataDirLimit), rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok end, - ets:insert(?ETS_NAME, {disk_free, CurrentFree}), - State#state{alarmed = NewAlarmed, actual = CurrentFree}. - -get_disk_free(Dir, {unix, Sun}, Port) - when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> - Df = find_cmd("df"), - parse_free_unix(run_port_cmd(Df ++ " -k '" ++ Dir ++ "'", Port)); -get_disk_free(Dir, {unix, _}, Port) -> - Df = find_cmd("df"), - parse_free_unix(run_port_cmd(Df ++ " -kP '" ++ Dir ++ "'", Port)); -get_disk_free(Dir, {win32, _}, not_used) -> - % Dir: - % "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia" - case win32_get_drive_letter(Dir) of - error -> - ?LOG_WARNING("Expected the mnesia directory absolute " - "path to start with a drive letter like " - "'C:'. The path is: '~tp'", [Dir]), - {ok, Free} = win32_get_disk_free_dir(Dir), - Free; - DriveLetter -> - % Note: yes, "$\s" is the $char sequence for an ASCII space - F = fun([D, $:, $\\, $\s | _]) when D =:= DriveLetter -> - true; - (_) -> false - end, - % Note: we can use os_mon_sysinfo:get_disk_info/1 after the following is fixed: - % https://github.com/erlang/otp/issues/6156 - try - % Note: DriveInfoStr is in this format - % "C:\\ DRIVE_FIXED 720441434112 1013310287872 720441434112\n" - Lines = os_mon_sysinfo:get_disk_info(), - [DriveInfoStr] = lists:filter(F, Lines), - [DriveLetter, $:, $\\, $\s | DriveInfo] = DriveInfoStr, - - % https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-getdiskfreespaceexa - % lib/os_mon/c_src/win32sysinfo.c: - % if (fpGetDiskFreeSpaceEx(drive,&availbytes,&totbytes,&totbytesfree)){ - % sprintf(answer,"%s DRIVE_FIXED %I64u %I64u %I64u\n",drive,availbytes,totbytes,totbytesfree); - ["DRIVE_FIXED", FreeBytesAvailableToCallerStr, - _TotalNumberOfBytesStr, _TotalNumberOfFreeBytesStr] = string:tokens(DriveInfo, " "), - list_to_integer(FreeBytesAvailableToCallerStr) - catch _:{timeout, _}:_ -> - %% could not compute the result - 'NaN'; - _:Reason:_ -> - ?LOG_WARNING("Free disk space monitoring failed to retrieve the amount of available space: ~p", [Reason]), - %% could not compute the result - 'NaN' - end - end. + ets:insert(?ETS_NAME, {disk_free, DataDirFree}), + + NewMounts = maps:map( + fun(Path, M) -> + Available = maps:get(Path, DiskFree, 'NaN'), + M#mount{available = Available} + end, Mounts), + ets:insert(?MOUNT_ETS_NAME, [M || _Path := M <- NewMounts]), + + AlarmedMs = alarmed_mounts(Mounts), + AlarmedQTs = alarmed_queue_types(Mounts), + NewAlarmedMs = alarmed_mounts(NewMounts), + NewAlarmedQTs = alarmed_queue_types(NewMounts), + + NewlyClearedMs = sets:subtract(AlarmedMs, NewAlarmedMs), + NewlyClearedQTs = sets:subtract(AlarmedQTs, NewAlarmedQTs), + NewlyAlarmedMs = sets:subtract(NewAlarmedMs, AlarmedMs), + NewlyAlarmedQTs = sets:subtract(NewAlarmedQTs, AlarmedQTs), + + lists:foreach( + fun(Path) -> + #mount{name = Name, + limit = Limit, + available = Available} = maps:get(Path, NewMounts), + emit_update_info(Name, "insufficient", Available, Limit) + end, lists:sort(sets:to_list(NewlyAlarmedMs))), + lists:foreach( + fun(QT) -> + Alarm = {resource_limit, {disk, QT}, node()}, + rabbit_alarm:set_alarm({Alarm, []}) + end, lists:sort(sets:to_list(NewlyAlarmedQTs))), + lists:foreach( + fun(Path) -> + #mount{name = Name, + limit = Limit, + available = Available} = maps:get(Path, NewMounts), + emit_update_info(Name, "sufficient", Available, Limit) + end, lists:sort(sets:to_list(NewlyClearedMs))), + lists:foreach( + fun(QT) -> + Alarm = {resource_limit, {disk, QT}, node()}, + rabbit_alarm:clear_alarm(Alarm) + end, lists:sort(sets:to_list(NewlyClearedQTs))), + + State#state{alarmed = NewAlarmed, + actual = DataDirFree, + mounts = NewMounts}. -parse_free_unix(Str) -> - case string:tokens(Str, "\n") of - [_, S | _] -> case string:tokens(S, " \t") of - [_, _, _, Free | _] -> list_to_integer(Free) * 1024; - _ -> exit({unparseable, Str}) - end; - _ -> exit({unparseable, Str}) - end. - -win32_get_drive_letter([DriveLetter, $:, $/ | _]) when (DriveLetter >= $a andalso DriveLetter =< $z) -> - % Note: os_mon_sysinfo returns drives with uppercase letters, so uppercase it here - DriveLetter - 32; -win32_get_drive_letter([DriveLetter, $:, $/ | _]) when (DriveLetter >= $A andalso DriveLetter =< $Z) -> - DriveLetter; -win32_get_drive_letter(_) -> - error. - -win32_get_disk_free_dir(Dir) -> - %% On Windows, the Win32 API enforces a limit of 260 characters - %% (MAX_PATH). If we call `dir` with a path longer than that, it - %% fails with "File not found". Starting with Windows 10 version - %% 1607, this limit was removed, but the administrator has to - %% configure that. - %% - %% NTFS supports paths up to 32767 characters. Therefore, paths - %% longer than 260 characters exist but they are "inaccessible" to - %% `dir`. - %% - %% A workaround is to tell the Win32 API to not parse a path and - %% just pass it raw to the underlying filesystem. To do this, the - %% path must be prepended with "\\?\". That's what we do here. - %% - %% However, the underlying filesystem may not support forward - %% slashes transparently, as the Win32 API does. Therefore, we - %% convert all forward slashes to backslashes. - %% - %% See the following page to learn more about this: - %% https://ss64.com/nt/syntax-filenames.html - RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all), - case run_os_cmd("dir /-C /W \"" ++ RawDir ++ "\"") of - {error, Error} -> - exit({unparseable, Error}); - CommandResult -> - LastLine0 = lists:last(string:tokens(CommandResult, "\r\n")), - LastLine1 = lists:reverse(LastLine0), - {match, [Free]} = re:run(LastLine1, "(\\d+)", - [{capture, all_but_first, list}]), - {ok, list_to_integer(lists:reverse(Free))} - end. +emit_update_info(StateStr, CurrentFree, Limit) -> + ?LOG_INFO( + "Free disk space is ~ts. Free bytes: ~b. Limit: ~b", + [StateStr, CurrentFree, Limit]). +emit_update_info(MountPoint, StateStr, CurrentFree, Limit) -> + ?LOG_INFO( + "Free space of disk '~ts' is ~ts. Free bytes: ~b. Limit: ~b", + [MountPoint, StateStr, CurrentFree, Limit]). + +-spec alarmed_mounts(mounts()) -> sets:set(file:filename()). +alarmed_mounts(Mounts) -> + maps:fold( + fun (Path, #mount{available = Available, + limit = Limit}, Acc) when Available < Limit -> + sets:add_element(Path, Acc); + (_Path, _Mount, Acc) -> + Acc + end, sets:new([{version, 2}]), Mounts). + +-spec alarmed_queue_types(mounts()) -> + sets:set(module()). +alarmed_queue_types(MountPoints) -> + maps:fold( + fun (_Path, #mount{available = Available, + limit = Limit, + queue_types = QTs}, Acc) when Available < Limit -> + sets:union(QTs, Acc); + (_Path, _Mount, Acc) -> + Acc + end, sets:new([{version, 2}]), MountPoints). + +-spec get_disk_free(#state{}) -> + #{file:filename() => AvailableB :: non_neg_integer()}. +get_disk_free(#state{dir = DataDir, mounts = Mounts}) -> + #{Mount => AvailableKiB * 1024 || + {Mount, Total, AvailableKiB, Capacity} <- disksup:get_disk_info(), + {Total, AvailableKiB, Capacity} =/= {0, 0, 0}, + Mount =:= DataDir orelse is_map_key(Mount, Mounts)}. interpret_limit({mem_relative, Relative}) when is_number(Relative) -> @@ -413,68 +424,145 @@ interpret_limit(Absolute) -> ?DEFAULT_DISK_FREE_LIMIT end. -emit_update_info(StateStr, CurrentFree, Limit) -> - ?LOG_INFO( - "Free disk space is ~ts. Free bytes: ~b. Limit: ~b", - [StateStr, CurrentFree, Limit]). - start_timer(State) -> State#state{timer = erlang:send_after(interval(State), self(), update)}. -interval(#state{alarmed = true, - max_interval = MaxInterval}) -> - MaxInterval; -interval(#state{actual = 'NaN', - max_interval = MaxInterval}) -> - MaxInterval; -interval(#state{limit = Limit, - actual = Actual, +interval(#state{actual = DataDirAvailable, + limit = DataDirLimit, + mounts = Mounts, min_interval = MinInterval, max_interval = MaxInterval}) -> - IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, + DataDirGap = case DataDirAvailable of + N when is_integer(N) -> + N - DataDirLimit; + _ -> + 1_000_000_000 + end, + SmallestGap = maps:fold( + fun (_Path, #mount{available = A, limit = L}, Min) + when is_integer(A) -> + erlang:min(A - L, Min); + (_Path, _Mount, Min) -> + Min + end, DataDirGap, Mounts), + IdealInterval = 2 * SmallestGap / fast_rate(), trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). +fast_rate() -> + %% 250MB/s i.e. 250kB/ms + application:get_env(rabbit, disk_monitor_fast_rate, 250_000). + +-spec mounts() -> mounts(). +mounts() -> + case application:get_env(rabbit, disk_free_limits) of + {ok, Limits} -> + maps:fold( + fun(Prec, #{name := Name, + mount := Path, + limit := Limit0, + queue_types := QTs0}, Acc) -> + Res = rabbit_resource_monitor_misc:parse_information_unit( + Limit0), + case Res of + {ok, Limit} -> + {Known, Unknown} = resolve_queue_types(QTs0), + case Unknown of + [_ | _] -> + ?LOG_WARNING( + "Unknown queue types configured for " + "disk '~ts': ~ts", + [Name, lists:join(", ", Unknown)]), + ok; + _ -> + ok + end, + case Known of + [] -> + ?LOG_ERROR("No known queue types " + "configured for disk '~ts'. " + "The disk will not be " + "monitored for free " + "disk space.", [Name]), + Acc; + _ -> + QTs = sets:from_list(Known, + [{version, 2}]), + Mount = #mount{name = Name, + precedence = Prec, + limit = Limit, + queue_types = QTs}, + Acc#{Path => Mount} + end; + {error, parse_error} -> + ?LOG_ERROR("Unable to parse free disk limit " + "'~ts' for disk '~ts'. The disk will " + "not be monitored for free space.", + [Limit0, Name]), + Acc + end + end, #{}, Limits); + undefined -> + #{} + end. + +resolve_queue_types(QTs) -> + resolve_queue_types(QTs, {[], []}). + +resolve_queue_types([], Acc) -> + Acc; +resolve_queue_types([QT | Rest], {Known, Unknown}) -> + case rabbit_registry:lookup_type_module(queue, QT) of + {ok, TypeModule} -> + resolve_queue_types(Rest, {[TypeModule | Known], Unknown}); + {error, not_found} -> + resolve_queue_types(Rest, {Known, [QT | Unknown]}) + end. + enable(#state{retries = 0} = State) -> ?LOG_ERROR("Free disk space monitor failed to start!"), State; -enable(#state{dir = Dir, os = OS, port = Port} = State) -> - enable_handle_disk_free(catch get_disk_free(Dir, OS, Port), State). - -enable_handle_disk_free(DiskFree, State) when is_integer(DiskFree) -> - enable_handle_total_memory(catch vm_memory_monitor:get_total_memory(), DiskFree, State); -enable_handle_disk_free(Error, #state{interval = Interval, retries = Retries} = State) -> - ?LOG_WARNING("Free disk space monitor encountered an error " - "(e.g. failed to parse output from OS tools). " - "Retries left: ~b Error:~n~tp", - [Retries, Error]), - erlang:send_after(Interval, self(), try_enable), - State#state{enabled = false}. - -enable_handle_total_memory(TotalMemory, DiskFree, #state{limit = Limit} = State) when is_integer(TotalMemory) -> - ?LOG_INFO("Enabling free disk space monitoring " - "(disk free space: ~b, total memory: ~b)", [DiskFree, TotalMemory]), - start_timer(set_disk_limits(State, Limit)); -enable_handle_total_memory(Error, _DiskFree, #state{interval = Interval, retries = Retries} = State) -> - ?LOG_WARNING("Free disk space monitor encountered an error " - "retrieving total memory. " - "Retries left: ~b Error:~n~tp", - [Retries, Error]), - erlang:send_after(Interval, self(), try_enable), - State#state{enabled = false}. - -run_os_cmd(Cmd) -> - Pid = self(), - Ref = make_ref(), - CmdFun = fun() -> - CmdResult = rabbit_misc:os_cmd(Cmd), - Pid ! {Pid, Ref, CmdResult} - end, - CmdPid = spawn(CmdFun), - receive - {Pid, Ref, CmdResult} -> - CmdResult - after 5000 -> - exit(CmdPid, kill), - ?LOG_ERROR("Command timed out: '~ts'", [Cmd]), - {error, timeout} +enable(#state{dir = undefined, + interval = Interval, + retries = Retries} = State) -> + case resolve_data_dir() of + {ok, MountPoint} -> + enable(State#state{dir = MountPoint}); + {error, Reason} -> + ?LOG_WARNING("Free disk space monitor encounter an error " + "resolving the data directory '~ts'. Retries left: " + "~b Error:~n~tp", + [rabbit:data_dir(), Retries, Reason]), + erlang:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end; +enable(#state{dir = Dir, + retries = Retries, + interval = Interval, + limit = Limit} = State) -> + DiskFree = get_disk_free(State), + case vm_memory_monitor:get_total_memory() of + TotalMemory when is_integer(TotalMemory) -> + ?LOG_INFO("Enabling free disk space monitoring (data dir free " + "space: ~b, total memory: ~b)", + [maps:get(Dir, DiskFree, unknown), TotalMemory]), + start_timer(set_disk_limits(State, Limit)); + unknown -> + ?LOG_WARNING("Free disk space monitor could not determine total " + "memory. Retries left: ~b", [Retries]), + erlang:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end. + +resolve_data_dir() -> + case disksup:get_disk_info(rabbit:data_dir()) of + [{"none", 0, 0, 0}] -> + {error, disksup_not_available}; + [{MountPoint, 0, 0, 0}] -> + {error, {cannot_determine_space, MountPoint}}; + [{MountPoint, _TotalKiB, _AvailableKiB, _Capacity}] -> + {ok, MountPoint}; + [] -> + {error, no_disk_info}; + [_ | _] = Infos -> + {error, {multiple_disks, length(Infos)}} end. diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index fe3e74c7b92c..0f5c517fd769 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -113,7 +113,14 @@ %% a set of the reasons why we are %% blocked: {resource, memory}, {resource, disk}. %% More reasons can be added in the future. - blocked_by, + blocked_by = sets:new([{version, 2}]) :: + sets:set(flow | {resource, + rabbit_alarm:resource_alarm_source()}), + %% the set of queue types which have been published to + %% by channels on this connection, used for per-queue + %% type disk alarm blocking + queue_types_published = #{} :: #{ChannelPid :: pid() => + sets:set(rabbit_queue_type:queue_type())}, %% true if received any publishes, false otherwise %% note that this will also be true when connection is %% already blocked @@ -335,7 +342,6 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> throttle = #throttle{ last_blocked_at = never, should_block = false, - blocked_by = sets:new([{version, 2}]), connection_blocked_message_sent = false }, proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)}, @@ -677,6 +683,14 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; +handle_other({'$gen_cast', {channel_published_to_queue_type, ChPid, QT}}, + #v1{throttle = Throttle0} = State0) -> + QTs = maps:update_with( + ChPid, fun(ChQTs) -> sets:add_element(QT, ChQTs) end, + sets:from_list([QT], [{version, 2}]), + Throttle0#throttle.queue_types_published), + Throttle = Throttle0#throttle{queue_types_published = QTs}, + control_throttle(State0#v1{throttle = Throttle}); handle_other(ensure_stats, State) -> ensure_stats_timer(State); handle_other(emit_stats, State) -> @@ -1007,14 +1021,21 @@ is_over_node_channel_limit() -> end end. -channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> +channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount, + throttle = Throttle0} = State) -> case get({ch_pid, ChPid}) of - undefined -> {undefined, State}; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - {Channel, State#v1{channel_count = ChannelCount - 1}} + undefined -> + {undefined, State}; + {Channel, MRef} -> + credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + QT = maps:remove(ChPid, + Throttle0#throttle.queue_types_published), + Throttle = Throttle0#throttle{queue_types_published = QT}, + {Channel, State#v1{channel_count = ChannelCount - 1, + throttle = Throttle}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -1727,36 +1748,55 @@ send_error_on_channel0_and_close(Channel, Protocol, Reason, State) -> blocked_by_message(#throttle{blocked_by = Reasons}) -> %% we don't want to report internal flow as a reason here since %% it is entirely transient - Reasons1 = sets:del_element(flow, Reasons), - RStr = string:join([format_blocked_by(R) || R <- sets:to_list(Reasons1)], " & "), + RStr = lists:join([rabbit_alarm:format_resource_alarm_source(R) || + {resource, R} <- sets:to_list(Reasons)], + " & "), list_to_binary(rabbit_misc:format("low on ~ts", [RStr])). -format_blocked_by({resource, memory}) -> "memory"; -format_blocked_by({resource, disk}) -> "disk"; -format_blocked_by({resource, disc}) -> "disk". - update_last_blocked_at(Throttle) -> Throttle#throttle{last_blocked_at = erlang:monotonic_time()}. connection_blocked_message_sent( #throttle{connection_blocked_message_sent = BS}) -> BS. -should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_blocked(Throttle) -> should_block(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) =/= 0 + do_throttle_reasons_apply(Throttle) andalso not connection_blocked_message_sent(Throttle). -should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_unblocked(Throttle) -> connection_blocked_message_sent(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) == 0. + not do_throttle_reasons_apply(Throttle). + +do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, disk}) -> + true; + ({resource, memory}) -> + true; + ({resource, {disk, QT}}) -> + has_published_to_queue_type(QT, Throttle); + (_) -> + %% Flow control should not send connection.blocked + false + end, sets:to_list(Reasons)). + +has_published_to_queue_type(QT, #throttle{queue_types_published = QTs}) -> + rabbit_misc:maps_any( + fun(_ChPid, ChQT) -> sets:is_element(QT, ChQT) end, QTs). %% Returns true if we have a reason to block %% this connection. -has_reasons_to_block(#throttle{blocked_by = Reasons}) -> - sets:size(Reasons) > 0. +has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, {disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + true + end, sets:to_list(Reasons)). is_blocked_by_flow(#throttle{blocked_by = Reasons}) -> sets:is_element(flow, Reasons). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index affe36296df7..02899f1827b7 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -126,7 +126,8 @@ [queue, <<"stream">>, ?MODULE]}}, {cleanup, {rabbit_registry, unregister, [queue, <<"stream">>]}}, - {requires, rabbit_registry} + {requires, rabbit_registry}, + {enables, recovery} ]}). -type client() :: #stream_client{}. diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 7b08e3fec706..5d5363daa0c6 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -679,8 +679,13 @@ msg_store_dir_wildcard() -> rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), "*"])). msg_store_dir_base() -> - Dir = rabbit:data_dir(), - filename:join([Dir, "msg_stores", "vhosts"]). + case application:get_env(rabbit, classic_queue_data_dir) of + {ok, Dir} -> + Dir; + undefined -> + Dir = rabbit:data_dir(), + filename:join([Dir, "msg_stores", "vhosts"]) + end. config_file_path(VHost) -> VHostDir = msg_store_dir_path(VHost), diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index dc214a170766..c31787e5ca68 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -179,6 +179,15 @@ ssl_options.fail_if_no_peer_cert = true", "disk_free_limit.absolute = 2P", [{rabbit,[{disk_free_limit, "2P"}]}], []}, + %% Disk monitor polling + {disk_monitor_tune_polling_parameters, + "disk_monitor.fast_rate = 1000000 # 1 GB/sec + disk_monitor.min_interval = 50 + disk_monitor.max_interval = 20000", + [{rabbit,[{disk_monitor_fast_rate, 1_000_000}, + {disk_monitor_min_interval, 50}, + {disk_monitor_max_interval, 20_000}]}], + []}, {default_users, " @@ -467,6 +476,26 @@ tcp_listen_options.exit_on_close = false", "total_memory_available_override_value = 1024MB", [{rabbit,[{total_memory_available_override_value, "1024MB"}]}], []}, + {disk_free_limits_per_mount, + "disk_free_limits.1.name = messaging + disk_free_limits.1.mount = /data/queues + disk_free_limits.1.limit = 2GB + disk_free_limits.1.queue_types = classic,quorum + + disk_free_limits.2.name = streaming + disk_free_limits.2.mount = /data/streams + disk_free_limits.2.limit = 2GB + disk_free_limits.2.queue_types = stream", + [{rabbit,[{disk_free_limits, + #{1 => #{name => <<"messaging">>, + mount => "/data/queues", + limit => "2GB", + queue_types => [<<"classic">>, <<"quorum">>]}, + 2 => #{name => <<"streaming">>, + mount => "/data/streams", + limit => "2GB", + queue_types => [<<"stream">>]}}}]}], + []}, {ranch_connection_max, "ranch_connection_max = 999", [{rabbit,[{ranch_connection_max, 999}]}], @@ -1111,6 +1140,16 @@ credential_validator.regexp = ^abc\\d+", [], []}, + %% + %% Classic queue data dir + %% + {classic_queue_data_dir, + "classic_queue.data_dir = /data/rabbitmq/classic", + [{rabbit, [ + {classic_queue_data_dir, "/data/rabbitmq/classic"} + ]}], + []}, + %% %% Quorum queue %% @@ -1253,6 +1292,16 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [ {stream_read_ahead, false} ]}], + []}, + + %% + %% Stream data dir + %% + {stream_data_dir, + "stream.data_dir = /data/rabbitmq/stream", + [{osiris, [ + {data_dir, "/data/rabbitmq/stream"} + ]}], []} ]. diff --git a/deps/rabbit/test/unit_disk_monitor_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_SUITE.erl index 3058cc904ebb..2a795f013765 100644 --- a/deps/rabbit/test/unit_disk_monitor_SUITE.erl +++ b/deps/rabbit/test/unit_disk_monitor_SUITE.erl @@ -9,9 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). --compile(export_all). - --define(TIMEOUT, 30000). +-compile([nowarn_export_all, export_all]). all() -> [ diff --git a/deps/rabbit_common/src/rabbit_env.erl b/deps/rabbit_common/src/rabbit_env.erl index e3c551ae9c01..bc70a8e31280 100644 --- a/deps/rabbit_common/src/rabbit_env.erl +++ b/deps/rabbit_common/src/rabbit_env.erl @@ -306,7 +306,12 @@ context_to_app_env_vars1( [{kernel, inet_default_connect_options, [{nodelay, true}]}, {sasl, errlog_type, error}, {os_mon, start_cpu_sup, false}, - {os_mon, start_disksup, false}, + %% Start disksup but configure the threshold high enough that it will + %% never alarm. `disksup' must be started to call `get_disk_info/0,1' + %% and `get_disk_data/0' but we don't want it polluting the logs with + %% its alarms. Alarming is done by `rabbit_disk_monitor' instead. + {os_mon, start_disksup, true}, + {os_mon, disk_almost_full_threshold, 1.0}, {os_mon, start_memsup, false}, {mnesia, dir, DataDir}, {ra, data_dir, QuorumQueueDir}, diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex index efa36cf1bc4d..b4a987453f6c 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex @@ -24,6 +24,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do {:validation_failure, :too_many_args} end + def validate(["mount" | _] = args, _) when length(args) < 3 do + {:validation_failure, :not_enough_args} + end + + def validate(["mount" | _] = args, _) when length(args) > 3 do + {:validation_failure, :too_many_args} + end + def validate([limit], _) do case Integer.parse(limit) do {_, ""} -> @@ -47,12 +55,41 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do end end + def validate(["mount", _name, limit], _) do + case Integer.parse(limit) do + {_, ""} -> + :ok + + {limit_val, units} -> + case memory_unit_absolute(limit_val, units) do + scaled_limit when is_integer(scaled_limit) -> :ok + _ -> {:validation_failure, :bad_argument} + end + + _ -> + {:validation_failure, :bad_argument} + end + end + def validate([_ | rest], _) when length(rest) > 0 do {:validation_failure, :too_many_args} end use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + def run(["mount", mount_name, limit], %{node: node_name}) do + limit = + case Integer.parse(limit) do + {limit, ""} -> limit + {limit, units} -> + case memory_unit_absolute(limit, units) do + scaled_limit when is_integer(scaled_limit) -> + scaled_limit + end + end + make_rpc_call(node_name, [mount_name, limit]) + end + def run(["mem_relative", _] = args, opts) do set_disk_free_limit_relative(args, opts) end @@ -70,6 +107,10 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do use RabbitMQ.CLI.DefaultOutput + def banner(["mount", mount, limit], %{node: node_name}) do + "Setting disk free limit for mount #{mount} on #{node_name} to #{limit} ..." + end + def banner(["mem_relative", arg], %{node: node_name}) do "Setting disk free limit on #{node_name} to #{arg} times the total RAM ..." end @@ -77,12 +118,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do def banner([arg], %{node: node_name}), do: "Setting disk free limit on #{node_name} to #{arg} bytes ..." - def usage, do: "set_disk_free_limit | mem_relative " + def usage, do: "set_disk_free_limit | mem_relative | mount " def usage_additional() do [ ["", "New limit as an absolute value with units, e.g. 1GB"], - ["mem_relative ", "New limit as a fraction of total memory reported by the OS"] + ["mem_relative ", "New limit as a fraction of total memory reported by the OS"], + ["mount ", "New limit for the given mount name as an absolute value with units, e.g. 1GB"] ] end diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index 60715bbb4497..e346bd75f020 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -141,7 +141,8 @@ var ALL_COLUMNS = {'Statistics': [['file_descriptors', 'File descriptors', true], ['erlang_processes', 'Erlang processes', true], ['memory', 'Memory', true], - ['disk_space', 'Disk space', true]], + ['disk_space', 'Disk space', true], + ['mount_space', 'Other disks', true]], 'General': [['uptime', 'Uptime', true], ['cores', 'Cores', true], ['info', 'Info', true], diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs index e1739b9415fb..e67c71f1d205 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs @@ -131,6 +131,22 @@ <% } %> +<% + for (var i = 0; i < node.mount_stats.length; i++) { + var mount = node.mount_stats[i]; +%> + + + <%= fmt_string(mount.name) %> disk space + + +<%= node_stat_bar('available', 'limit', 'low watermark', mount, fmt_bytes_axis, + mount.available < mount.limit ? 'red' : 'green', + mount.available < mount.limit ? 'disk_free-alarm' : null, + true) %> + + +<% } %> diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs index ac152cbfc67b..82e02f4bb5ab 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs @@ -93,6 +93,24 @@ <% if (show_column('overview', 'disk_space')) { %> Disk space <% } %> + <% if (show_column('overview', 'mount_space')) { %> + <% + var other_disk_names = []; + var unique_disk_names = {}; + for (var i = 0; i < nodes.length; i++) { + for (var j = 0; j < nodes[i].mount_stats.length; j++) { + var name = nodes[i].mount_stats[j].name; + if (!Object.hasOwnProperty(unique_disk_names, name)) { + unique_disk_names[name] = true; + other_disk_names.push(name); + } + } + } + %> + <% for (var i = 0; i < other_disk_names.length; i++) { %> + <%= fmt_string(other_disk_names[i]) %> disk space + <% } %> + <% } %> <% if (show_column('overview', 'uptime')) { %> Uptime <% } %> @@ -180,6 +198,20 @@ <% } %> <% } %> + <% if (show_column('overview', 'mount_space')) { %> + <% for (var i = 0; i < other_disk_names.length; i++) { + var mount = node.mount_stats.find((m) => m.name == other_disk_names[i]); + %> + + <% if (mount) { %> + <%= node_stat_bar('available', 'limit', 'low watermark', + mount, fmt_bytes_axis, + node.disk_free_alarm ? 'red' : 'green', + node.disk_free_alarm ? 'disk_free-alarm' : null, true) %> + <% } %> + + <% } %> + <% } %> <% if (show_column('overview', 'uptime')) { %> <%= fmt_uptime(node.uptime) %> <% } %> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl index 890022a1aaba..f6fe6e96549e 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl @@ -661,7 +661,8 @@ node_stats(Ranges, Objs, Interval) -> StatsD = [{cluster_links, NodeNodeStats}], MgmtStats = maps:get(mgmt_stats, NData), Details = augment_details(Obj, []), % augmentation needs to be node local - combine(Props, Obj) ++ Details ++ Stats ++ StatsD ++ MgmtStats + MountStats = [{mount_stats, maps:get(mount_stats, NData, [])}], + combine(Props, Obj) ++ Details ++ Stats ++ StatsD ++ MgmtStats ++ MountStats end || Obj <- Objs]. combine(New, Old) -> diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl index 0e66b3e1de18..7cfc5572f120 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl @@ -108,14 +108,15 @@ vhost_data(Ranges, Id) -> node_data(Ranges, Id) -> maps:from_list( - [{mgmt_stats, mgmt_queue_length_stats(Id)}] ++ - [{node_node_metrics, node_node_metrics()}] ++ - node_raw_detail_stats_data(Ranges, Id) ++ - [raw_message_data(node_coarse_stats, + [{mgmt_stats, mgmt_queue_length_stats(Id)}, + {node_node_metrics, node_node_metrics()}, + {node_stats, lookup_element(node_stats, Id)}, + {mount_stats, [maps:to_list(M) || M <- rabbit_disk_monitor:get_mount_free()]}, + raw_message_data(node_coarse_stats, pick_range(coarse_node_stats, Ranges), Id), raw_message_data(node_persister_stats, - pick_range(coarse_node_stats, Ranges), Id), - {node_stats, lookup_element(node_stats, Id)}] ++ + pick_range(coarse_node_stats, Ranges), Id)] ++ + node_raw_detail_stats_data(Ranges, Id) ++ node_connection_churn_rates_data(Ranges, Id)). overview_data(_Pid, User, Ranges, VHosts) -> diff --git a/deps/rabbitmq_prometheus/metrics.md b/deps/rabbitmq_prometheus/metrics.md index 7f61b0d3af94..980d82929250 100644 --- a/deps/rabbitmq_prometheus/metrics.md +++ b/deps/rabbitmq_prometheus/metrics.md @@ -136,6 +136,15 @@ These metrics are specific to the stream protocol. | rabbitmq_process_resident_memory_bytes | Memory used in bytes | | rabbitmq_resident_memory_limit_bytes | Memory high watermark in bytes | +### Per-mount disk space + +| Metric | Description | +| --- | --- | +| rabbitmq_mount_space_available_bytes | Disk space available in bytes on configured mount | +| rabbitmq_mount_space_available_limit_bytes | Disk space available low watermark in bytes on configured mount | + +Both metrics have a `disk` label identifying the configured disk name which was measured. + ### Connections | Metric | Description | diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl index 284ff73c9fc8..911b9add6fd8 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl @@ -35,11 +35,13 @@ collect_mf(_Registry, Callback) -> Alarms = rabbit_alarm:get_local_alarms(500), %% TODO: figure out timeout ActiveAlarms = lists:foldl(fun ({{resource_limit, disk, _}, _}, Acc) -> - maps:put(disk_limit, 1, Acc); + Acc#{disk_limit => 1}; + ({{resource_limit, {disk, QT}, _}, _}, Acc) -> + Acc#{{disk, QT} => 1}; ({{resource_limit, memory, _}, _}, Acc) -> - maps:put(memory_limit, 1, Acc); + Acc#{memory_limit => 1}; ({file_descriptor_limit, _}, Acc) -> - maps:put(file_descriptor_limit, 1, Acc) + Acc#{file_descriptor_limit => 1} end, #{}, Alarms), @@ -58,6 +60,15 @@ collect_mf(_Registry, Callback) -> <<"is 1 if VM memory watermark alarm is in effect">>, untyped, [untyped_metric(maps:get(memory_limit, ActiveAlarms, 0))])), + + Callback(create_mf(?METRIC_NAME(<<"queue_type_free_disk_space_watermark">>), + <<"is 1 if the queue type disk-space alarm is in effect">>, + untyped, + [prometheus_model_helpers:untyped_metric( + #{queue_type => QT}, + maps:get({disk, QT}, ActiveAlarms, 0)) || + {_, QT} <- rabbit_registry:lookup_all(queue)])), + ok catch exit:{timeout, _} -> diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 7f6ed70d56dc..a508a1d775f1 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -85,6 +85,10 @@ {2, undefined, erlang_net_ticktime_seconds, gauge, "Inter-node heartbeat interval", net_ticktime}, {2, ?MILLISECOND, erlang_uptime_seconds, gauge, "Node uptime", uptime} ]}, + {mount_metrics, [ + {2, undefined, mount_space_available_bytes, gauge, "Disk space available in bytes on configured mount"}, + {3, undefined, mount_space_available_limit_bytes, gauge, "Disk space available low watermark in bytes on configured mount"} + ]}, {node_persister_metrics, [ {2, undefined, io_read_ops_total, counter, "Total number of I/O read operations", io_read_count}, @@ -856,6 +860,12 @@ get_data(exchange_names, _, _) -> Label = <<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", (atom_to_binary(EType))/binary, "\"">>, [{Label, 1}|Acc] end, [], rabbit_exchange:list()); +get_data(mount_metrics, _, _) -> + [{<<"disk=", Name/binary>>, Available, Limit} + || #{name := Name, + available := Available, + limit := Limit} <- rabbit_disk_monitor:get_mount_free(), + Available =/= 'NaN']; get_data(Table, _, _) -> ets:tab2list(Table).