From 103fd11cc2a7f21c1ffbc67962c3ce3046f7ac95 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 16 Jun 2025 11:20:30 -0400 Subject: [PATCH] WIP: Add per-queue-type disk limits This is an extension of the idea of the free disk space alarm. That option enables blocking publishers when the free disk space on the data dir's disk falls below some threshold. This feature blocks publishers of individual queue types when the disk space taken by the queue type exceeds the configured threshold. This change is incomplete: it only affects QQs and streams and AMQP 0-9-1 so far. --- deps/rabbit/priv/schema/rabbit.schema | 20 +++ deps/rabbit/src/rabbit_alarm.erl | 15 +- deps/rabbit/src/rabbit_channel.erl | 28 ++- deps/rabbit/src/rabbit_disk_usage.erl | 76 +++++++++ deps/rabbit/src/rabbit_queue_type.erl | 10 ++ .../src/rabbit_queue_type_disk_monitor.erl | 159 ++++++++++++++++++ deps/rabbit/src/rabbit_quorum_queue.erl | 15 ++ deps/rabbit/src/rabbit_reader.erl | 78 +++++++-- deps/rabbit/src/rabbit_stream_queue.erl | 17 +- example.conf | 2 + 10 files changed, 396 insertions(+), 24 deletions(-) create mode 100644 deps/rabbit/src/rabbit_disk_usage.erl create mode 100644 deps/rabbit/src/rabbit_queue_type_disk_monitor.erl create mode 100644 example.conf diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index ba20e864fdb3..79c09fff67e1 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1241,6 +1241,26 @@ fun(Conf) -> end end}. +%% +%% Per-queue-type disk limits +%% ===================== +%% + +%% TODO: do relative limits make sense - what would they be relative to? The +%% full disk size? +%% {mapping, "quorum_queue_disk_limit.relative", "rabbit.quorum_queue_disk_limit", [ +%% {datatype, float}]}. + +{mapping, "quorum_queue_disk_limit.absolute", "rabbit.quorum_queue_disk_limit", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. + +{mapping, "stream_queue_disk_limit.absolute", "rabbit.stream_queue_disk_limit", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. + %% %% Clustering %% ===================== diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index ef5b55dfa9f8..c4096c081aca 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -46,7 +46,7 @@ -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. --type resource_alarm_source() :: 'disk' | 'memory'. +-type resource_alarm_source() :: 'disk' | 'memory' | {queue_type_disk, atom()}. -type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}. -type alarm() :: local_alarm() | resource_alarm(). -type resource_alert() :: {WasAlarmSetForNode :: boolean(), @@ -138,6 +138,11 @@ format_as_map({resource_limit, memory, Node}) -> <<"resource">> => ?MEMORY_RESOURCE, <<"node">> => Node }; +format_as_map({resource_limit, {queue_type_disk, QType}, Node}) -> + #{ + <<"resource">> => todo, + <<"node">> => Node + }; format_as_map({resource_limit, Limit, Node}) -> #{ <<"resource">> => rabbit_data_coercion:to_binary(Limit), @@ -291,7 +296,7 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), case StillHasAlerts of true -> ok; - false -> rabbit_log:warning("~ts resource limit alarm cleared across the cluster", [Source]) + false -> rabbit_log:warning("~tp resource limit alarm cleared across the cluster", [Source]) end, Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of @@ -326,9 +331,11 @@ internal_register(Pid, {M, F, A} = AlertMFA, NewAlertees = dict:store(Pid, AlertMFA, Alertees), State#alarms{alertees = NewAlertees}. +%% TODO: handle formatting of resources in these: + handle_set_resource_alarm(Source, Node, State) -> rabbit_log:warning( - "~ts resource limit alarm set on node ~tp.~n~n" + "~tp resource limit alarm set on node ~tp.~n~n" "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", @@ -347,7 +354,7 @@ handle_set_alarm(Alarm, State) -> {ok, State}. handle_clear_resource_alarm(Source, Node, State) -> - rabbit_log:warning("~ts resource limit alarm cleared on node ~tp", + rabbit_log:warning("~tp resource limit alarm cleared on node ~tp", [Source, Node]), {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index ed5b58845a59..85490fbd8ef9 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -163,6 +163,7 @@ delivery_flow :: flow | noflow, interceptor_state, queue_states, + queue_types_published :: sets:set(QType :: atom()), tick_timer, publishing_mode = false :: boolean() }). @@ -527,7 +528,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = rabbit_queue_type:init() + queue_states = rabbit_queue_type:init(), + queue_types_published = sets:new([{version, 2}]) }, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, @@ -2057,9 +2059,10 @@ deliver_to_queues(XName, ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0), MsgSeqNo = maps:get(correlation, Options, undefined), State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0), + State2 = notify_published_queue_types(Qs, State1), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes - State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}), + State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}), case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), @@ -2082,6 +2085,27 @@ deliver_to_queues(XName, [rabbit_misc:rs(Resource)]) end. +notify_published_queue_types(Qs, + #ch{cfg = #conf{reader_pid = ReaderPid}, + queue_types_published = QTypes0} = State0) -> + QTypes = lists:foldl( + fun(Q0, Acc) -> + Q = case Q0 of + {Q1, _RouteInfo} -> Q1; + _ -> Q0 + end, + QType = amqqueue:get_type(Q), + case sets:is_element(QType, Acc) of + true -> + Acc; + false -> + ReaderPid ! {channel_published_to_queue_type, + self(), QType}, + sets:add_element(QType, Acc) + end + end, QTypes0, Qs), + State0#ch{queue_types_published = QTypes}. + process_routing_mandatory(_Mandatory = true, _RoutedToQs = [], Msg, diff --git a/deps/rabbit/src/rabbit_disk_usage.erl b/deps/rabbit/src/rabbit_disk_usage.erl new file mode 100644 index 000000000000..53a176a7ef93 --- /dev/null +++ b/deps/rabbit/src/rabbit_disk_usage.erl @@ -0,0 +1,76 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_disk_usage). + +%% Functions for calculating disk usage of a given directory. + +-include_lib("kernel/include/file.hrl"). + +-export([scan/1]). + +%% @doc Calculates the disk usage in bytes of the given directory. +%% +%% On especially large directories this can be an expensive operation since +%% each sub-directory is scanned recursively and each file's metadata must be +%% read. +-spec scan(Dir) -> {ok, Size} | {error, Error} when + Dir :: filename:filename_all(), + Size :: non_neg_integer(), + Error :: not_directory | file:posix() | badarg. +scan(Dir) -> + case file:read_file_info(Dir) of + {ok, #file_info{type = directory, size = S}} -> + {ok, Gatherer} = gatherer:start_link(), + scan_directory(Dir, Gatherer), + Size = sum(Gatherer, S), + gatherer:stop(Gatherer), + {ok, Size}; + {ok, #file_info{}} -> + {error, not_directory}; + {error, _} = Err -> + Err + end. + +scan_directory(Dir, Gatherer) -> + gatherer:fork(Gatherer), + worker_pool:submit_async(fun() -> scan_directory0(Dir, Gatherer) end). + +scan_directory0(Dir, Gatherer) -> + link(Gatherer), + Size = case file:list_dir_all(Dir) of + {ok, Entries} -> + lists:foldl( + fun(Entry, Acc) -> + Path = filename:join(Dir, Entry), + case file:read_file_info(Path) of + {ok, #file_info{type = directory, + size = S}} -> + scan_directory(Path, Gatherer), + Acc + S; + {ok, #file_info{size = S}} -> + Acc + S; + _ -> + Acc + end + end, 0, Entries); + _ -> + 0 + end, + gatherer:in(Gatherer, Size), + gatherer:finish(Gatherer), + unlink(Gatherer), + ok. + +-spec sum(pid(), non_neg_integer()) -> non_neg_integer(). +sum(Gatherer, Size) -> + case gatherer:out(Gatherer) of + empty -> + Size; + {value, S} -> + sum(Gatherer, Size + S) + end. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index d11b1ec14fa8..c761e7403a5e 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -304,6 +304,16 @@ -callback queue_vm_ets() -> {StatsKeys :: [atom()], ETSNames:: [[atom()]]}. +%% The disk usage limit for the queue type, if any. +-callback disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined. +%% Calculate the disk space in bytes of the queue type. +%% This callback is optional but must be implemented if `disk_limit/0' is +%% defined. +-callback disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}. + +-optional_callbacks([disk_footprint/0, + disk_limit/0]). + -spec discover(binary() | atom()) -> queue_type(). discover(<<"undefined">>) -> fallback(); diff --git a/deps/rabbit/src/rabbit_queue_type_disk_monitor.erl b/deps/rabbit/src/rabbit_queue_type_disk_monitor.erl new file mode 100644 index 000000000000..7d7f68a54346 --- /dev/null +++ b/deps/rabbit/src/rabbit_queue_type_disk_monitor.erl @@ -0,0 +1,159 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_queue_type_disk_monitor). + +%% A server for alarming on high disk usage per queue type. +%% +%% The server scans periodically and checks each queue type against its limit +%% using the `disk_footprint/0' and `disk_limit/0' callbacks in +%% `rabbit_queue_type'. Typically this callback uses `rabbit_disk_usage:scan/1'. +%% +%% Also see `rabbit_disk_monitoring' which periodically checks the total space +%% taken on the mounted disk containing `rabbit:data_dir/0'. + +-include_lib("kernel/include/logger.hrl"). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(limit, {queue_type :: queue_type(), + type_module :: module(), + limit :: Bytes :: non_neg_integer()}). + +-record(state, {limits :: [#limit{}], + alarmed = alarmed() :: alarmed(), + timer :: timer:tref() | undefined}). + +-type queue_type() :: atom(). +-type alarmed() :: sets:set(queue_type()). + +-type disk_usage_limit_spec() :: %% A total number of bytes + {absolute, non_neg_integer()} | + %% %% A fraction of the disk's capacity. + %% {relative, float()} | + %% A string which will be parsed and + %% interpreted as an absolute limit. + string(). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + Limits = lists:foldl( + fun({Type, TypeModule}, Acc) -> + case get_limit(Type, TypeModule) of + {ok, Limit} -> + [#limit{queue_type = Type, + type_module = TypeModule, + limit = Limit} | Acc]; + error -> + Acc + end + end, [], rabbit_registry:lookup_all(queue)), + Timer = erlang:send_after(5_000, self(), scan), + {ok, #state{limits = Limits, timer = Timer}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(scan, #state{alarmed = Alarmed0} = State) -> + Alarmed = lists:foldl(fun scan/2, alarmed(), State#state.limits), + ok = handle_alarmed(Alarmed0, Alarmed), + Timer = erlang:send_after(5_000, self(), scan), + {noreply, State#state{alarmed = Alarmed, timer = Timer}}; +handle_info(Info, State) -> + ?LOG_DEBUG("~tp unhandled msg: ~tp", [?MODULE, Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +alarmed() -> sets:new([{version, 2}]). + +-spec get_limit(atom(), module()) -> {ok, disk_usage_limit_spec()} | error. +get_limit(QType, QTypeModule) -> + try QTypeModule:disk_limit() of + undefined -> + error; + {absolute, Abs} when is_integer(Abs) andalso Abs >= 0 -> + {ok, Abs}; + %% {relative, Rel} when is_float(Rel) andalso Rel >= 0.0 -> + %% TODO: to convert to abs we need to cache the disk capacity for + %% the first `relative' spec we see. + %% Do we even need relative? Should it be proportional to the disk + %% capacity or to the other components? + %% {ok, {relative, Rel}}; + String when is_list(String) -> + case rabbit_resource_monitor_misc:parse_information_unit(String) of + {ok, Bytes} -> + {ok, Bytes}; + {error, parse_error} -> + ?LOG_WARNING("Unable to parse disk limit ~tp for queue " + "type '~ts'", [String, QType]), + error + end + catch + error:undef -> + error + end. + +-spec scan(Limit :: #limit{}, alarmed()) -> alarmed(). +scan(#limit{queue_type = QType, + type_module = QTypeModule, + limit = Limit}, Alarmed) -> + %% NOTE: `disk_footprint/0' is an optional callback but it should always + %% be implemented if the queue type implements `disk_limit/0'. + case QTypeModule:disk_footprint() of + {ok, Bytes} -> + %% TODO: remove this printf debugging... + ?LOG_INFO("Measured queue type '~ts' at ~p bytes (limit ~p)", [QType, Bytes, Limit]), + case Bytes >= Limit of + true -> sets:add_element(QTypeModule, Alarmed); + false -> Alarmed + end; + {error, enoent} -> + Alarmed; + {error, Error} -> + ?LOG_WARNING("Failed to calculate disk usage of queue type '~ts': " + "~tp", [QType, Error]), + Alarmed + end. + +-spec handle_alarmed(Before :: alarmed(), After :: alarmed()) -> ok. +handle_alarmed(NoChange, NoChange) -> + ok; +handle_alarmed(Before, After) -> + Added = sets:subtract(After, Before), + ?LOG_WARNING("Newly alarmed: ~p", [Added]), + ok = sets:fold( + fun(QType, ok) -> + rabbit_alarm:set_alarm({alarm(QType), []}) + end, ok, Added), + Removed = sets:subtract(Before, After), + ?LOG_WARNING("Stopped alarming: ~p", [Removed]), + ok = sets:fold( + fun(QType, ok) -> + rabbit_alarm:clear_alarm(alarm(QType)) + end, ok, Removed), + ok. + +alarm(QType) -> + {resource_limit, {queue_type_disk, QType}, node()}. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 9c0e7fd9ca3e..33fece2720c1 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -67,6 +67,8 @@ -export([notify_decorators/1, notify_decorators/3, spawn_notify_decorators/3]). +-export([disk_footprint/0, + disk_limit/0]). -export([is_enabled/0, is_compatible/3, @@ -2066,6 +2068,19 @@ notify_decorators(QName, F, A) -> is_stateful() -> true. +-spec disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}. +disk_footprint() -> + case ra_system:fetch(?RA_SYSTEM) of + #{data_dir := Dir} -> + rabbit_disk_usage:scan(Dir); + _ -> + {ok, 0} + end. + +-spec disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined. +disk_limit() -> + application:get_env(rabbit, quorum_queue_disk_limit, undefined). + force_shrink_member_to_current_member(VHost, Name) -> Node = node(), QName = rabbit_misc:r(VHost, queue, Name), diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index e89595e469b3..d2933f7f6d71 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -109,16 +109,19 @@ %% never | timestamp() last_blocked_at, %% a set of the reasons why we are - %% blocked: {resource, memory}, {resource, disk}. + %% blocked: {resource, memory}, {resource, disk}, flow, ... %% More reasons can be added in the future. - blocked_by, + blocked_by :: sets:set(flow | {resource, memory | disk | {queue_type_disk, atom()}}), %% true if received any publishes, false otherwise %% note that this will also be true when connection is %% already blocked should_block, %% true if we had we sent a connection.blocked, %% false otherwise - connection_blocked_message_sent + connection_blocked_message_sent, + %% + published_to_queue_types = #{} :: #{ChPid :: pid() => + sets:set(QType :: atom())} }). -define(CREATION_EVENT_KEYS, @@ -608,6 +611,14 @@ handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), {_, State1} = channel_cleanup(ChPid, State), maybe_close(control_throttle(State1)); +handle_other({channel_published_to_queue_type, ChPid, QType}, + #v1{throttle = Throttle0} = State0) -> + QTypes = maps:update_with( + ChPid, fun(Ts) -> sets:add_element(QType, Ts) end, + sets:from_list([QType], [{version, 2}]), + Throttle0#throttle.published_to_queue_types), + Throttle = Throttle0#throttle{published_to_queue_types = QTypes}, + State0#v1{throttle = Throttle}; handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) -> %% rabbitmq/rabbitmq-server#544 %% The connection port process has exited due to the TCP socket being closed. @@ -1004,14 +1015,22 @@ is_over_node_channel_limit() -> end end. -channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> +channel_cleanup(ChPid, #v1{channel_count = ChannelCount, + throttle = Throttle0} = State) -> case get({ch_pid, ChPid}) of - undefined -> {undefined, State}; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - {Channel, State#v1{channel_count = ChannelCount - 1}} + undefined -> + {undefined, State}; + {Channel, MRef} -> + credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + %% TODO: reevaluate throttle now that the connection may no longer + %% have a publisher to a queue type which is in disk alarm. + QTypes = maps:remove(ChPid, Throttle0#throttle.published_to_queue_types), + Throttle = Throttle0#throttle{published_to_queue_types = QTypes}, + {Channel, State#v1{channel_count = ChannelCount - 1, + throttle = Throttle}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -1730,7 +1749,9 @@ blocked_by_message(#throttle{blocked_by = Reasons}) -> format_blocked_by({resource, memory}) -> "memory"; format_blocked_by({resource, disk}) -> "disk"; -format_blocked_by({resource, disc}) -> "disk". +format_blocked_by({resource, disc}) -> "disk"; +format_blocked_by({resource, {queue_type_disk, QType}}) -> + lists:flatten(io_lib:format("~ts disk", [QType])). update_last_blocked_at(Throttle) -> Throttle#throttle{last_blocked_at = erlang:monotonic_time()}. @@ -1738,22 +1759,45 @@ update_last_blocked_at(Throttle) -> connection_blocked_message_sent( #throttle{connection_blocked_message_sent = BS}) -> BS. -should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_blocked(Throttle) -> should_block(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) =/= 0 + do_throttle_reasons_apply(Throttle) andalso not connection_blocked_message_sent(Throttle). -should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_unblocked(Throttle) -> connection_blocked_message_sent(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) == 0. + not do_throttle_reasons_apply(Throttle). + +do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, disk}) -> + true; + ({resource, memory}) -> + true; + ({resource, {queue_type_disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + %% NOTE: flow reason is ignored. + false + end, sets:to_list(Reasons)). + +has_published_to_queue_type( + QType, #throttle{published_to_queue_types = QTypes}) -> + rabbit_misc:maps_any( + fun(_ChPid, ChQTypes) -> sets:is_element(QType, ChQTypes) end, QTypes). %% Returns true if we have a reason to block %% this connection. -has_reasons_to_block(#throttle{blocked_by = Reasons}) -> - sets:size(Reasons) > 0. +has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, {queue_type_disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + true + end, sets:to_list(Reasons)). is_blocked_by_flow(#throttle{blocked_by = Reasons}) -> sets:is_element(flow, Reasons). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index dc240e04eee1..d2c883a6823f 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -40,7 +40,9 @@ format/2, capabilities/0, notify_decorators/1, - is_stateful/0]). + is_stateful/0, + disk_footprint/0, + disk_limit/0]). -export([list_with_minimum_quorum/0]). @@ -1357,6 +1359,19 @@ notify_decorators(Q) when ?is_amqqueue(Q) -> %% Not supported ok. +-spec disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}. +disk_footprint() -> + case application:get_env(osiris, data_dir) of + {ok, Dir} -> + rabbit_disk_usage:scan(Dir); + _ -> + {ok, 0} + end. + +-spec disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined. +disk_limit() -> + application:get_env(rabbit, stream_queue_disk_limit, undefined). + resend_all(#stream_client{leader = LeaderPid, writer_id = WriterId, correlation = Corrs} = State) -> diff --git a/example.conf b/example.conf new file mode 100644 index 000000000000..18062994fb08 --- /dev/null +++ b/example.conf @@ -0,0 +1,2 @@ +quorum_queue_disk_limit.absolute = 2GiB +stream_queue_disk_limit.absolute = 10GiB