Skip to content

Commit 3e7bd56

Browse files
committed
Force-delete queues, which have no live master or slave processes.
Fixes #1501 [#155801556] If a queue is configured to not be promoted (via ha-promote-on-shutdown: when-synced) queue.delete can hang. Make it check for process existense first and force-delete if no master of slave processes are running. Do not force-delete if if_empty is set, since there is no way to check that the queue is empty.
1 parent 500d316 commit 3e7bd56

File tree

2 files changed

+87
-2
lines changed

2 files changed

+87
-2
lines changed

src/rabbit_amqqueue.erl

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,8 +835,53 @@ delete_immediately(QPids) ->
835835
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
836836
ok.
837837

838-
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
839-
delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
838+
delete(Q, IfUnused, IfEmpty, ActingUser) ->
839+
case wait_for_promoted_or_stopped(Q) of
840+
{promoted, Q1 = #amqqueue{pid = QPid}} ->
841+
delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]});
842+
{stopped, Q1} ->
843+
#resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name,
844+
case IfEmpty of
845+
true ->
846+
rabbit_log:error("Queue ~s on vhost ~s master node is down. "
847+
"Unable to check if the queue is empty. "
848+
"Delete failed",
849+
[Name, Vhost]),
850+
{error, not_empty};
851+
false ->
852+
rabbit_log:warning("Queue ~s on vhost ~s master node is down. "
853+
"Force-deleting the queue",
854+
[Name, Vhost]),
855+
delete_crashed_internal(Q1, ActingUser),
856+
{ok, 0}
857+
end;
858+
{error, not_found} ->
859+
%% Assume the queue was deleted
860+
{ok, 0}
861+
end.
862+
863+
-spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}.
864+
wait_for_promoted_or_stopped(#amqqueue{name = QName}) ->
865+
case lookup(QName) of
866+
{ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} ->
867+
case rabbit_mnesia:is_process_alive(QPid) of
868+
true -> {promoted, Q};
869+
false ->
870+
case lists:any(fun(Pid) ->
871+
rabbit_mnesia:is_process_alive(Pid)
872+
end, SPids) of
873+
%% There is a live slave. May be promoted
874+
true ->
875+
timer:sleep(100),
876+
wait_for_promoted_or_stopped(Q);
877+
%% All slave pids are stopped.
878+
%% No process left for the queue
879+
false -> {stopped, Q}
880+
end
881+
end;
882+
{error, not_found} ->
883+
{error, not_found}
884+
end.
840885

841886
delete_crashed(Q) ->
842887
delete_crashed(Q, ?INTERNAL_USER).

test/dynamic_ha_SUITE.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ groups() ->
5757
{clustered, [], [
5858
{cluster_size_2, [], [
5959
vhost_deletion,
60+
force_delete_if_no_master,
6061
promote_on_shutdown,
6162
slave_recovers_after_vhost_failure,
6263
slave_recovers_after_vhost_down_an_up,
@@ -247,6 +248,45 @@ vhost_deletion(Config) ->
247248
ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
248249
ok.
249250

251+
force_delete_if_no_master(Config) ->
252+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
253+
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
254+
<<"all">>),
255+
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
256+
[begin
257+
amqp_channel:call(ACh, #'queue.declare'{queue = Q,
258+
durable = true}),
259+
rabbit_ct_client_helpers:publish(ACh, Q, 10)
260+
end || Q <- [<<"ha.nopromote.test1">>, <<"ha.nopromote.test2">>]],
261+
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
262+
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
263+
264+
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
265+
?assertExit(
266+
{{shutdown, {server_initiated_close, 404, _}}, _},
267+
amqp_channel:call(
268+
BCh, #'queue.declare'{queue = <<"ha.nopromote.test1">>,
269+
durable = true})),
270+
271+
BCh1 = rabbit_ct_client_helpers:open_channel(Config, B),
272+
?assertExit(
273+
{{shutdown, {server_initiated_close, 404, _}}, _},
274+
amqp_channel:call(
275+
BCh1, #'queue.declare'{queue = <<"ha.nopromote.test2">>,
276+
durable = true})),
277+
BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
278+
#'queue.delete_ok'{} =
279+
amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test1">>}),
280+
%% Delete with if_empty will fail, since we don't know if the queue is empty
281+
?assertExit(
282+
{{shutdown, {server_initiated_close, 406, _}}, _},
283+
amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test2">>,
284+
if_empty = true})),
285+
BCh3 = rabbit_ct_client_helpers:open_channel(Config, B),
286+
#'queue.delete_ok'{} =
287+
amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}),
288+
ok.
289+
250290
promote_on_shutdown(Config) ->
251291
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
252292
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,

0 commit comments

Comments
 (0)