Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1969,19 +1969,43 @@ force_shrink_member_to_current_member(VHost, Name) ->
end,
_ = rabbit_amqqueue:update(QName, Fun),
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes],
<<<<<<< HEAD
<<<<<<< HEAD
rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]);
=======
rabbit_log:warning("Disaster recovery procedure: shrinking finished");
>>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster)
=======
rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]);
>>>>>>> 16170d093b (Update QQ force-shrink logging)
_ ->
rabbit_log:warning("Shrinking failed, ~ts not found", [QNameFmt]),
{error, not_found}
end.

force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) ->
<<<<<<< HEAD
<<<<<<< HEAD
rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]),
=======
rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues in vhost ~tp to a single node cluster", [VHost]),
>>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1)
=======
rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]),
>>>>>>> 16170d093b (Update QQ force-shrink logging)
ListQQs = fun() -> rabbit_amqqueue:list(VHost) end,
force_all_queues_shrink_member_to_current_member(ListQQs).

force_all_queues_shrink_member_to_current_member() ->
<<<<<<< HEAD
<<<<<<< HEAD
rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]),
=======
rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"),
>>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1)
=======
rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]),
>>>>>>> 16170d093b (Update QQ force-shrink logging)
ListQQs = fun() -> rabbit_amqqueue:list() end,
force_all_queues_shrink_member_to_current_member(ListQQs).

Expand All @@ -1991,7 +2015,15 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
QName = amqqueue:get_name(Q),
{RaName, _} = amqqueue:get_pid(Q),
OtherNodes = lists:delete(Node, get_nodes(Q)),
<<<<<<< HEAD
<<<<<<< HEAD
rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]),
=======
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
>>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster)
=======
rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]),
>>>>>>> 16170d093b (Update QQ force-shrink logging)
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
Fun = fun (QQ) ->
TS0 = amqqueue:get_type_state(QQ),
Expand All @@ -2000,8 +2032,20 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
end,
_ = rabbit_amqqueue:update(QName, Fun),
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes]
<<<<<<< HEAD
<<<<<<< HEAD
end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE],
rabbit_log:warning("Shrinking finished"),
<<<<<<< HEAD
=======
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
=======
end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE],
>>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1)
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
>>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster)
=======
>>>>>>> 16170d093b (Update QQ force-shrink logging)
ok.

is_minority(All, Up) ->
Expand Down
18 changes: 18 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,17 @@ groups() ->
single_active_consumer_priority_take_over,
single_active_consumer_priority,
force_shrink_member_to_current_member,
<<<<<<< HEAD
<<<<<<< HEAD
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member
=======
force_all_queues_shrink_member_to_current_member
>>>>>>> 10dbde1f71 (QQ tests for force-shrink to current member operations)
=======
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member
>>>>>>> c9d97e61de (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1)
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
Expand Down Expand Up @@ -1236,6 +1245,10 @@ force_all_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs].

<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> c9d97e61de (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1)
force_vhost_queues_shrink_member_to_current_member(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down Expand Up @@ -1302,6 +1315,11 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

<<<<<<< HEAD
=======
>>>>>>> 10dbde1f71 (QQ tests for force-shrink to current member operations)
=======
>>>>>>> c9d97e61de (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1)
priority_queue_fifo(Config) ->
%% testing: if hi priority messages are published before lo priority
%% messages they are always consumed first (fifo)
Expand Down
Loading