diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 40a7dd74994f..064859bc4a37 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -335,7 +335,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> throttle = #throttle{ last_blocked_at = never, should_block = false, - blocked_by = sets:new(), + blocked_by = sets:new([{version, 2}]), connection_blocked_message_sent = false }, proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)}, @@ -1310,7 +1310,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]), + BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms], [{version, 2}]), Throttle1 = Throttle#throttle{blocked_by = BlockedBy}, {ok, ChannelSupSupPid} = diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 07ebabe6915f..de047e55a3f9 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -36,7 +36,7 @@ proc_state = connect_packet_unprocessed :: connect_packet_unprocessed | rabbit_mqtt_processor:state(), connection_state :: running | blocked, - conserve :: boolean(), + blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()), stats_timer :: rabbit_event:state(), keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(), conn_name :: binary() @@ -53,8 +53,8 @@ start_link(Ref, _Transport, []) -> -spec conserve_resources(pid(), rabbit_alarm:resource_alarm_source(), rabbit_alarm:resource_alert()) -> ok. -conserve_resources(Pid, _, {_, Conserve, _}) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, ok. -spec info(pid(), rabbit_types:info_keys()) -> @@ -78,7 +78,7 @@ init(Ref) -> {ok, ConnStr} -> ConnName = rabbit_data_coercion:to_binary(ConnStr), ?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]), - _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000), erlang:send_after(LoginTimeout, self(), login_timeout), State0 = #state{socket = RealSocket, @@ -86,7 +86,7 @@ init(Ref) -> conn_name = ConnName, await_recv = false, connection_state = running, - conserve = false, + blocked_by = sets:from_list(Alarms, [{version, 2}]), parse_state = rabbit_mqtt_packet:init_state(), stats_timer = rabbit_event:init_stats_timer()}, State = control_throttle(State0), @@ -185,9 +185,16 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock}) when Tag =:= tcp_error; Tag =:= ssl_error -> network_error(Reason, State); -handle_info({conserve_resources, Conserve}, State) -> +handle_info({conserve_resources, Source, Conserve}, + #state{blocked_by = BlockedBy0} = State) -> + BlockedBy = case Conserve of + true -> + sets:add_element(Source, BlockedBy0); + false -> + sets:del_element(Source, BlockedBy0) + end, maybe_process_deferred_recv( - control_throttle(State #state{ conserve = Conserve })); + control_throttle(State #state{ blocked_by = BlockedBy })); handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), @@ -417,10 +424,11 @@ run_socket(State = #state{ socket = Sock }) -> State#state{ await_recv = true }. control_throttle(State = #state{connection_state = ConnState, - conserve = Conserve, + blocked_by = BlockedBy, proc_state = PState, keepalive = KState }) -> + Conserve = not sets:is_empty(BlockedBy), Throttle = case PState of connect_packet_unprocessed -> Conserve; _ -> rabbit_mqtt_processor:throttle(Conserve, PState) @@ -537,7 +545,7 @@ format_state(#state{socket = Socket, parse_state = _, proc_state = PState, connection_state = ConnectionState, - conserve = Conserve, + blocked_by = BlockedBy, stats_timer = StatsTimer, keepalive = Keepalive, conn_name = ConnName @@ -552,7 +560,7 @@ format_state(#state{socket = Socket, rabbit_mqtt_processor:format_status(PState) end, connection_state => ConnectionState, - conserve => Conserve, + blocked_by => lists:sort(sets:to_list(BlockedBy)), stats_timer => StatsTimer, keepalive => Keepalive, conn_name => ConnName}. diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index a3e5b72dc697..0fa7d218ae84 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -33,7 +33,7 @@ parse_state, processor_state, state, - conserve_resources, + blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()), recv_outstanding, max_frame_size, current_frame_size, @@ -82,7 +82,7 @@ init([SupHelperPid, Ref, Configuration]) -> [self(), ConnName]), ParseState = rabbit_stomp_frame:initial_state(), - _ = register_resource_alarm(), + Alarms = register_resource_alarm(), LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000), MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE), @@ -100,7 +100,7 @@ init([SupHelperPid, Ref, Configuration]) -> max_frame_size = MaxFrameSize, current_frame_size = 0, state = running, - conserve_resources = false, + blocked_by = sets:from_list(Alarms, [{version, 2}]), recv_outstanding = false})), #reader_state.stats_timer), {backoff, 1000, 1000, 10000}); {error, enotconn} -> @@ -146,8 +146,15 @@ handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock}) {stop, {inet_error, Reason}, State}; handle_info(emit_stats, State) -> {noreply, emit_stats(State), hibernate}; -handle_info({conserve_resources, Conserve}, State) -> - NewState = State#reader_state{conserve_resources = Conserve}, +handle_info({conserve_resources, Source, Conserve}, + #reader_state{blocked_by = BlockedBy0} = State) -> + BlockedBy = case Conserve of + true -> + sets:add_element(Source, BlockedBy0); + false -> + sets:del_element(Source, BlockedBy0) + end, + NewState = State#reader_state{blocked_by = BlockedBy}, {noreply, run_socket(control_throttle(NewState)), hibernate}; handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), @@ -288,18 +295,19 @@ process_received_bytes(Bytes, -spec conserve_resources(pid(), rabbit_alarm:resource_alarm_source(), rabbit_alarm:resource_alert()) -> ok. -conserve_resources(Pid, _Source, {_, Conserve, _}) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, ok. register_resource_alarm() -> rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}). -control_throttle(State = #reader_state{state = CS, - conserve_resources = Mem, +control_throttle(State = #reader_state{state = CS, + blocked_by = BlockedBy, heartbeat = Heartbeat}) -> - case {CS, Mem orelse credit_flow:blocked()} of + Conserve = not sets:is_empty(BlockedBy), + case {CS, Conserve orelse credit_flow:blocked()} of {running, true} -> State#reader_state{state = blocking}; {blocking, false} -> rabbit_heartbeat:resume_monitor(Heartbeat), State#reader_state{state = running}; diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 801cb83f8a93..fd36592391bd 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -38,7 +38,7 @@ proc_state = connect_packet_unprocessed :: connect_packet_unprocessed | rabbit_mqtt_processor:state(), connection_state = running :: running | blocked, - conserve = false :: boolean(), + blocked_by = sets:new([{version, 2}]) :: sets:set(rabbit_alarm:resource_alarm_source()), stats_timer :: option(rabbit_event:state()), keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(), conn_name :: option(binary()) @@ -110,8 +110,9 @@ websocket_init(State0 = #state{socket = Sock}) -> {ok, ConnStr} -> ConnName = rabbit_data_coercion:to_binary(ConnStr), ?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]), - _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - State = State0#state{conn_name = ConnName}, + Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + State = State0#state{conn_name = ConnName, + blocked_by = sets:from_list(Alarms, [{version, 2}])}, process_flag(trap_exit, true), {[], State, hibernate}; {error, Reason} -> @@ -121,8 +122,8 @@ websocket_init(State0 = #state{socket = Sock}) -> -spec conserve_resources(pid(), rabbit_alarm:resource_alarm_source(), rabbit_alarm:resource_alert()) -> ok. -conserve_resources(Pid, _, {_, Conserve, _}) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, ok. -spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) -> @@ -145,8 +146,15 @@ websocket_handle(Frame, State) -> -spec websocket_info(any(), State) -> {cowboy_websocket:commands(), State} | {cowboy_websocket:commands(), State, hibernate}. -websocket_info({conserve_resources, Conserve}, State) -> - handle_credits(State#state{conserve = Conserve}); +websocket_info({conserve_resources, Source, Conserve}, + #state{blocked_by = BlockedBy0} = State) -> + BlockedBy = case Conserve of + true -> + sets:add_element(Source, BlockedBy0); + false -> + sets:del_element(Source, BlockedBy0) + end, + handle_credits(State#state{blocked_by = BlockedBy}); websocket_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), handle_credits(State); @@ -371,10 +379,11 @@ handle_credits(State0) -> {[{active, Active}], State, hibernate}. control_throttle(State = #state{connection_state = ConnState, - conserve = Conserve, + blocked_by = BlockedBy, proc_state = PState, keepalive = KState }) -> + Conserve = not sets:is_empty(BlockedBy), Throttle = case PState of connect_packet_unprocessed -> Conserve; _ -> rabbit_mqtt_processor:throttle(Conserve, PState)