Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
throttle = #throttle{
last_blocked_at = never,
should_block = false,
blocked_by = sets:new(),
blocked_by = sets:new([{version, 2}]),
connection_blocked_message_sent = false
},
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
Expand Down Expand Up @@ -1310,7 +1310,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),

Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms], [{version, 2}]),
Throttle1 = Throttle#throttle{blocked_by = BlockedBy},

{ok, ChannelSupSupPid} =
Expand Down
28 changes: 18 additions & 10 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
rabbit_mqtt_processor:state(),
connection_state :: running | blocked,
conserve :: boolean(),
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
stats_timer :: rabbit_event:state(),
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name :: binary()
Expand All @@ -53,8 +53,8 @@ start_link(Ref, _Transport, []) ->
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, _, {_, Conserve, _}) ->
Pid ! {conserve_resources, Conserve},
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.

-spec info(pid(), rabbit_types:info_keys()) ->
Expand All @@ -78,15 +78,15 @@ init(Ref) ->
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
erlang:send_after(LoginTimeout, self(), login_timeout),
State0 = #state{socket = RealSocket,
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
conn_name = ConnName,
await_recv = false,
connection_state = running,
conserve = false,
blocked_by = sets:from_list(Alarms, [{version, 2}]),
parse_state = rabbit_mqtt_packet:init_state(),
stats_timer = rabbit_event:init_stats_timer()},
State = control_throttle(State0),
Expand Down Expand Up @@ -185,9 +185,16 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
when Tag =:= tcp_error; Tag =:= ssl_error ->
network_error(Reason, State);

handle_info({conserve_resources, Conserve}, State) ->
handle_info({conserve_resources, Source, Conserve},
#state{blocked_by = BlockedBy0} = State) ->
BlockedBy = case Conserve of
true ->
sets:add_element(Source, BlockedBy0);
false ->
sets:del_element(Source, BlockedBy0)
end,
maybe_process_deferred_recv(
control_throttle(State #state{ conserve = Conserve }));
control_throttle(State #state{ blocked_by = BlockedBy }));

handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
Expand Down Expand Up @@ -417,10 +424,11 @@ run_socket(State = #state{ socket = Sock }) ->
State#state{ await_recv = true }.

control_throttle(State = #state{connection_state = ConnState,
conserve = Conserve,
blocked_by = BlockedBy,
proc_state = PState,
keepalive = KState
}) ->
Conserve = not sets:is_empty(BlockedBy),
Throttle = case PState of
connect_packet_unprocessed -> Conserve;
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
Expand Down Expand Up @@ -537,7 +545,7 @@ format_state(#state{socket = Socket,
parse_state = _,
proc_state = PState,
connection_state = ConnectionState,
conserve = Conserve,
blocked_by = BlockedBy,
stats_timer = StatsTimer,
keepalive = Keepalive,
conn_name = ConnName
Expand All @@ -552,7 +560,7 @@ format_state(#state{socket = Socket,
rabbit_mqtt_processor:format_status(PState)
end,
connection_state => ConnectionState,
conserve => Conserve,
blocked_by => lists:sort(sets:to_list(BlockedBy)),
stats_timer => StatsTimer,
keepalive => Keepalive,
conn_name => ConnName}.
28 changes: 18 additions & 10 deletions deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
parse_state,
processor_state,
state,
conserve_resources,
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
recv_outstanding,
max_frame_size,
current_frame_size,
Expand Down Expand Up @@ -82,7 +82,7 @@ init([SupHelperPid, Ref, Configuration]) ->
[self(), ConnName]),

ParseState = rabbit_stomp_frame:initial_state(),
_ = register_resource_alarm(),
Alarms = register_resource_alarm(),

LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
Expand All @@ -100,7 +100,7 @@ init([SupHelperPid, Ref, Configuration]) ->
max_frame_size = MaxFrameSize,
current_frame_size = 0,
state = running,
conserve_resources = false,
blocked_by = sets:from_list(Alarms, [{version, 2}]),
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{error, enotconn} ->
Expand Down Expand Up @@ -146,8 +146,15 @@ handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
{stop, {inet_error, Reason}, State};
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
handle_info({conserve_resources, Conserve}, State) ->
NewState = State#reader_state{conserve_resources = Conserve},
handle_info({conserve_resources, Source, Conserve},
#reader_state{blocked_by = BlockedBy0} = State) ->
BlockedBy = case Conserve of
true ->
sets:add_element(Source, BlockedBy0);
false ->
sets:del_element(Source, BlockedBy0)
end,
NewState = State#reader_state{blocked_by = BlockedBy},
{noreply, run_socket(control_throttle(NewState)), hibernate};
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
Expand Down Expand Up @@ -288,18 +295,19 @@ process_received_bytes(Bytes,
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, _Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Conserve},
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.

register_resource_alarm() ->
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}).


control_throttle(State = #reader_state{state = CS,
conserve_resources = Mem,
control_throttle(State = #reader_state{state = CS,
blocked_by = BlockedBy,
heartbeat = Heartbeat}) ->
case {CS, Mem orelse credit_flow:blocked()} of
Conserve = not sets:is_empty(BlockedBy),
case {CS, Conserve orelse credit_flow:blocked()} of
{running, true} -> State#reader_state{state = blocking};
{blocking, false} -> rabbit_heartbeat:resume_monitor(Heartbeat),
State#reader_state{state = running};
Expand Down
25 changes: 17 additions & 8 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
rabbit_mqtt_processor:state(),
connection_state = running :: running | blocked,
conserve = false :: boolean(),
blocked_by = sets:new([{version, 2}]) :: sets:set(rabbit_alarm:resource_alarm_source()),
stats_timer :: option(rabbit_event:state()),
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name :: option(binary())
Expand Down Expand Up @@ -110,8 +110,9 @@ websocket_init(State0 = #state{socket = Sock}) ->
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]),
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
State = State0#state{conn_name = ConnName},
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
State = State0#state{conn_name = ConnName,
blocked_by = sets:from_list(Alarms, [{version, 2}])},
process_flag(trap_exit, true),
{[], State, hibernate};
{error, Reason} ->
Expand All @@ -121,8 +122,8 @@ websocket_init(State0 = #state{socket = Sock}) ->
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, _, {_, Conserve, _}) ->
Pid ! {conserve_resources, Conserve},
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.

-spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) ->
Expand All @@ -145,8 +146,15 @@ websocket_handle(Frame, State) ->
-spec websocket_info(any(), State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_info({conserve_resources, Conserve}, State) ->
handle_credits(State#state{conserve = Conserve});
websocket_info({conserve_resources, Source, Conserve},
#state{blocked_by = BlockedBy0} = State) ->
BlockedBy = case Conserve of
true ->
sets:add_element(Source, BlockedBy0);
false ->
sets:del_element(Source, BlockedBy0)
end,
handle_credits(State#state{blocked_by = BlockedBy});
websocket_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
handle_credits(State);
Expand Down Expand Up @@ -371,10 +379,11 @@ handle_credits(State0) ->
{[{active, Active}], State, hibernate}.

control_throttle(State = #state{connection_state = ConnState,
conserve = Conserve,
blocked_by = BlockedBy,
proc_state = PState,
keepalive = KState
}) ->
Conserve = not sets:is_empty(BlockedBy),
Throttle = case PState of
connect_packet_unprocessed -> Conserve;
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
Expand Down
Loading