Skip to content

Commit 971d32e

Browse files
committed
Handle multiple alarms consistently in protocol readers
The MQTT, WebMQTT and STOMP reader modules tracked whether they were blocked by any alarm, but they should instead keep a set of active alarms. Only tracking the `Conserve` part of the alarm notification from `rabbit_alarm` doesn't work correctly if a cluster enters and exits multiple alarms. For example if a cluster had both memory and disk alarms active and then cleared memory, these readers would unblock while they should remain blocked until both alarms are cleared. Keeping a set of `rabbit_alarm:resource_alarm_source()`s matches the AMQP 0-9-1 and 1.0 readers. This change also updates the 0-9-1 reader to use map-based `sets` for the sake of consistency.
1 parent f412512 commit 971d32e

File tree

4 files changed

+55
-30
lines changed

4 files changed

+55
-30
lines changed

deps/rabbit/src/rabbit_reader.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
335335
throttle = #throttle{
336336
last_blocked_at = never,
337337
should_block = false,
338-
blocked_by = sets:new(),
338+
blocked_by = sets:new([{version, 2}]),
339339
connection_blocked_message_sent = false
340340
},
341341
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
@@ -1310,7 +1310,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
13101310
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
13111311

13121312
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
1313-
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
1313+
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms], [{version, 2}]),
13141314
Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
13151315

13161316
{ok, ChannelSupSupPid} =

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3737
rabbit_mqtt_processor:state(),
3838
connection_state :: running | blocked,
39-
conserve :: boolean(),
39+
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
4040
stats_timer :: rabbit_event:state(),
4141
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
4242
conn_name :: binary()
@@ -53,8 +53,8 @@ start_link(Ref, _Transport, []) ->
5353
-spec conserve_resources(pid(),
5454
rabbit_alarm:resource_alarm_source(),
5555
rabbit_alarm:resource_alert()) -> ok.
56-
conserve_resources(Pid, _, {_, Conserve, _}) ->
57-
Pid ! {conserve_resources, Conserve},
56+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
57+
Pid ! {conserve_resources, Source, Conserve},
5858
ok.
5959

6060
-spec info(pid(), rabbit_types:info_keys()) ->
@@ -78,15 +78,15 @@ init(Ref) ->
7878
{ok, ConnStr} ->
7979
ConnName = rabbit_data_coercion:to_binary(ConnStr),
8080
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
81-
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
81+
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
8282
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
8383
erlang:send_after(LoginTimeout, self(), login_timeout),
8484
State0 = #state{socket = RealSocket,
8585
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
8686
conn_name = ConnName,
8787
await_recv = false,
8888
connection_state = running,
89-
conserve = false,
89+
blocked_by = sets:from_list(Alarms, [{version, 2}]),
9090
parse_state = rabbit_mqtt_packet:init_state(),
9191
stats_timer = rabbit_event:init_stats_timer()},
9292
State = control_throttle(State0),
@@ -185,9 +185,16 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
185185
when Tag =:= tcp_error; Tag =:= ssl_error ->
186186
network_error(Reason, State);
187187

188-
handle_info({conserve_resources, Conserve}, State) ->
188+
handle_info({conserve_resources, Source, Conserve},
189+
#state{blocked_by = BlockedBy0} = State) ->
190+
BlockedBy = case Conserve of
191+
true ->
192+
sets:add_element(Source, BlockedBy0);
193+
false ->
194+
sets:del_element(Source, BlockedBy0)
195+
end,
189196
maybe_process_deferred_recv(
190-
control_throttle(State #state{ conserve = Conserve }));
197+
control_throttle(State #state{ blocked_by = BlockedBy }));
191198

192199
handle_info({bump_credit, Msg}, State) ->
193200
credit_flow:handle_bump_msg(Msg),
@@ -417,10 +424,11 @@ run_socket(State = #state{ socket = Sock }) ->
417424
State#state{ await_recv = true }.
418425

419426
control_throttle(State = #state{connection_state = ConnState,
420-
conserve = Conserve,
427+
blocked_by = BlockedBy,
421428
proc_state = PState,
422429
keepalive = KState
423430
}) ->
431+
Conserve = not sets:is_empty(BlockedBy),
424432
Throttle = case PState of
425433
connect_packet_unprocessed -> Conserve;
426434
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
@@ -537,7 +545,7 @@ format_state(#state{socket = Socket,
537545
parse_state = _,
538546
proc_state = PState,
539547
connection_state = ConnectionState,
540-
conserve = Conserve,
548+
blocked_by = BlockedBy,
541549
stats_timer = StatsTimer,
542550
keepalive = Keepalive,
543551
conn_name = ConnName
@@ -552,7 +560,7 @@ format_state(#state{socket = Socket,
552560
rabbit_mqtt_processor:format_status(PState)
553561
end,
554562
connection_state => ConnectionState,
555-
conserve => Conserve,
563+
blocked_by => lists:sort(sets:to_list(BlockedBy)),
556564
stats_timer => StatsTimer,
557565
keepalive => Keepalive,
558566
conn_name => ConnName}.

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
parse_state,
3434
processor_state,
3535
state,
36-
conserve_resources,
36+
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
3737
recv_outstanding,
3838
max_frame_size,
3939
current_frame_size,
@@ -82,7 +82,7 @@ init([SupHelperPid, Ref, Configuration]) ->
8282
[self(), ConnName]),
8383

8484
ParseState = rabbit_stomp_frame:initial_state(),
85-
_ = register_resource_alarm(),
85+
Alarms = register_resource_alarm(),
8686

8787
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
8888
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
@@ -100,7 +100,7 @@ init([SupHelperPid, Ref, Configuration]) ->
100100
max_frame_size = MaxFrameSize,
101101
current_frame_size = 0,
102102
state = running,
103-
conserve_resources = false,
103+
blocked_by = sets:from_list(Alarms, [{version, 2}]),
104104
recv_outstanding = false})), #reader_state.stats_timer),
105105
{backoff, 1000, 1000, 10000});
106106
{error, enotconn} ->
@@ -146,8 +146,15 @@ handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
146146
{stop, {inet_error, Reason}, State};
147147
handle_info(emit_stats, State) ->
148148
{noreply, emit_stats(State), hibernate};
149-
handle_info({conserve_resources, Conserve}, State) ->
150-
NewState = State#reader_state{conserve_resources = Conserve},
149+
handle_info({conserve_resources, Source, Conserve},
150+
#reader_state{blocked_by = BlockedBy0} = State) ->
151+
BlockedBy = case Conserve of
152+
true ->
153+
sets:add_element(Source, BlockedBy0);
154+
false ->
155+
sets:del_element(Source, BlockedBy0)
156+
end,
157+
NewState = State#reader_state{blocked_by = BlockedBy},
151158
{noreply, run_socket(control_throttle(NewState)), hibernate};
152159
handle_info({bump_credit, Msg}, State) ->
153160
credit_flow:handle_bump_msg(Msg),
@@ -288,18 +295,19 @@ process_received_bytes(Bytes,
288295
-spec conserve_resources(pid(),
289296
rabbit_alarm:resource_alarm_source(),
290297
rabbit_alarm:resource_alert()) -> ok.
291-
conserve_resources(Pid, _Source, {_, Conserve, _}) ->
292-
Pid ! {conserve_resources, Conserve},
298+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
299+
Pid ! {conserve_resources, Source, Conserve},
293300
ok.
294301

295302
register_resource_alarm() ->
296303
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}).
297304

298305

299-
control_throttle(State = #reader_state{state = CS,
300-
conserve_resources = Mem,
306+
control_throttle(State = #reader_state{state = CS,
307+
blocked_by = BlockedBy,
301308
heartbeat = Heartbeat}) ->
302-
case {CS, Mem orelse credit_flow:blocked()} of
309+
Conserve = not sets:is_empty(BlockedBy),
310+
case {CS, Conserve orelse credit_flow:blocked()} of
303311
{running, true} -> State#reader_state{state = blocking};
304312
{blocking, false} -> rabbit_heartbeat:resume_monitor(Heartbeat),
305313
State#reader_state{state = running};

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3939
rabbit_mqtt_processor:state(),
4040
connection_state = running :: running | blocked,
41-
conserve = false :: boolean(),
41+
blocked_by = sets:new([{version, 2}]) :: sets:set(rabbit_alarm:resource_alarm_source()),
4242
stats_timer :: option(rabbit_event:state()),
4343
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
4444
conn_name :: option(binary())
@@ -141,8 +141,9 @@ websocket_init(State0 = #state{socket = Sock}) ->
141141
{ok, ConnStr} ->
142142
ConnName = rabbit_data_coercion:to_binary(ConnStr),
143143
?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]),
144-
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
145-
State = State0#state{conn_name = ConnName},
144+
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
145+
State = State0#state{conn_name = ConnName,
146+
blocked_by = sets:from_list(Alarms, [{version, 2}])},
146147
process_flag(trap_exit, true),
147148
{[], State, hibernate};
148149
{error, Reason} ->
@@ -152,8 +153,8 @@ websocket_init(State0 = #state{socket = Sock}) ->
152153
-spec conserve_resources(pid(),
153154
rabbit_alarm:resource_alarm_source(),
154155
rabbit_alarm:resource_alert()) -> ok.
155-
conserve_resources(Pid, _, {_, Conserve, _}) ->
156-
Pid ! {conserve_resources, Conserve},
156+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
157+
Pid ! {conserve_resources, Source, Conserve},
157158
ok.
158159

159160
-spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) ->
@@ -176,8 +177,15 @@ websocket_handle(Frame, State) ->
176177
-spec websocket_info(any(), State) ->
177178
{cowboy_websocket:commands(), State} |
178179
{cowboy_websocket:commands(), State, hibernate}.
179-
websocket_info({conserve_resources, Conserve}, State) ->
180-
handle_credits(State#state{conserve = Conserve});
180+
websocket_info({conserve_resources, Source, Conserve},
181+
#state{blocked_by = BlockedBy0} = State) ->
182+
BlockedBy = case Conserve of
183+
true ->
184+
sets:add_element(Source, BlockedBy0);
185+
false ->
186+
sets:del_element(Source, BlockedBy0)
187+
end,
188+
handle_credits(State#state{blocked_by = BlockedBy});
181189
websocket_info({bump_credit, Msg}, State) ->
182190
credit_flow:handle_bump_msg(Msg),
183191
handle_credits(State);
@@ -402,10 +410,11 @@ handle_credits(State0) ->
402410
{[{active, Active}], State, hibernate}.
403411

404412
control_throttle(State = #state{connection_state = ConnState,
405-
conserve = Conserve,
413+
blocked_by = BlockedBy,
406414
proc_state = PState,
407415
keepalive = KState
408416
}) ->
417+
Conserve = not sets:is_empty(BlockedBy),
409418
Throttle = case PState of
410419
connect_packet_unprocessed -> Conserve;
411420
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)

0 commit comments

Comments
 (0)