From 18950d012bd33e7e829c2524be1542d82728ce5f Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 6 Sep 2024 16:49:54 +0100 Subject: [PATCH 1/5] Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster wide consistency, ensuring only the leader is active/running (cherry picked from commit b675ce29f022bc9d46f20ef32e065b0bb9684c8b) (cherry picked from commit d9de6d989c7b8482f410e75d6791cdc1dc3ec8db) # Conflicts: # deps/rabbit/src/rabbit_quorum_queue.erl --- deps/rabbit/src/rabbit_quorum_queue.erl | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 94e2148aec31..b5f4e093dd76 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1969,7 +1969,11 @@ force_shrink_member_to_current_member(VHost, Name) -> end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes], +<<<<<<< HEAD rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]); +======= + rabbit_log:warning("Disaster recovery procedure: shrinking finished"); +>>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) _ -> rabbit_log:warning("Shrinking failed, ~ts not found", [QNameFmt]), {error, not_found} @@ -1991,7 +1995,11 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), OtherNodes = lists:delete(Node, get_nodes(Q)), +<<<<<<< HEAD rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), +======= + rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]), +>>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), @@ -2000,8 +2008,13 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] +<<<<<<< HEAD end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], rabbit_log:warning("Shrinking finished"), +======= + end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE], + rabbit_log:warning("Disaster recovery procedure: shrinking finished"), +>>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) ok. is_minority(All, Up) -> From 92bc7ac1a80cc0c66239a5f26a13182d2f2f4590 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Tue, 1 Oct 2024 15:16:16 +0100 Subject: [PATCH 2/5] QQ tests for force-shrink to current member operations (cherry picked from commit 60ee35ea7e269dca3eecc84a68fd1a5feaa64ec2) (cherry picked from commit 10dbde1f7102fb53d99c9c27ffc6ad7ff0094f8b) # Conflicts: # deps/rabbit/test/quorum_queue_SUITE.erl --- deps/rabbit/test/quorum_queue_SUITE.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2dfbe5d3fc68..4e52639d92d6 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -94,8 +94,12 @@ groups() -> single_active_consumer_priority_take_over, single_active_consumer_priority, force_shrink_member_to_current_member, +<<<<<<< HEAD force_all_queues_shrink_member_to_current_member, force_vhost_queues_shrink_member_to_current_member +======= + force_all_queues_shrink_member_to_current_member +>>>>>>> 10dbde1f71 (QQ tests for force-shrink to current member operations) ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -1236,6 +1240,7 @@ force_all_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs]. +<<<<<<< HEAD force_vhost_queues_shrink_member_to_current_member(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1302,6 +1307,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts]. +======= +>>>>>>> 10dbde1f71 (QQ tests for force-shrink to current member operations) priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority %% messages they are always consumed first (fifo) From bad78a620fd6a71daae673c9fc789541550445d2 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 2 Oct 2024 11:28:41 +0100 Subject: [PATCH 3/5] Implement force_vhost_queues_shrink_member_to_current_member/1 (cherry picked from commit c26aa3b1c76dd82fb9e67852b6a2f030a92bf7cc) (cherry picked from commit b03637f8ecbcbf8085100c809cde59abad760d70) # Conflicts: # deps/rabbit/src/rabbit_quorum_queue.erl --- deps/rabbit/src/rabbit_quorum_queue.erl | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index b5f4e093dd76..2c764af71488 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1980,12 +1980,20 @@ force_shrink_member_to_current_member(VHost, Name) -> end. force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> +<<<<<<< HEAD rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]), +======= + rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues in vhost ~tp to a single node cluster", [VHost]), +>>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1) ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, force_all_queues_shrink_member_to_current_member(ListQQs). force_all_queues_shrink_member_to_current_member() -> +<<<<<<< HEAD rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]), +======= + rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"), +>>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1) ListQQs = fun() -> rabbit_amqqueue:list() end, force_all_queues_shrink_member_to_current_member(ListQQs). @@ -2008,11 +2016,15 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] +<<<<<<< HEAD <<<<<<< HEAD end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], rabbit_log:warning("Shrinking finished"), ======= end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE], +======= + end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], +>>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1) rabbit_log:warning("Disaster recovery procedure: shrinking finished"), >>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) ok. From 7917a8a6d54f161e8a67da25d0e8870ba7b62bf6 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 2 Oct 2024 12:28:18 +0100 Subject: [PATCH 4/5] Add test for QQ force_vhost_queues_shrink_member_to_current_member/1 (cherry picked from commit de0c0dbd89b7c278a1145833cbeda6b7d3de34eb) (cherry picked from commit c9d97e61de062ca05b060b7ab327837e54e226d2) # Conflicts: # deps/rabbit/test/quorum_queue_SUITE.erl --- deps/rabbit/test/quorum_queue_SUITE.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 4e52639d92d6..dd64dceebdd0 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -94,12 +94,17 @@ groups() -> single_active_consumer_priority_take_over, single_active_consumer_priority, force_shrink_member_to_current_member, +<<<<<<< HEAD <<<<<<< HEAD force_all_queues_shrink_member_to_current_member, force_vhost_queues_shrink_member_to_current_member ======= force_all_queues_shrink_member_to_current_member >>>>>>> 10dbde1f71 (QQ tests for force-shrink to current member operations) +======= + force_all_queues_shrink_member_to_current_member, + force_vhost_queues_shrink_member_to_current_member +>>>>>>> c9d97e61de (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1) ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -1241,6 +1246,9 @@ force_all_queues_shrink_member_to_current_member(Config) -> end || Q <- QQs]. <<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> c9d97e61de (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1) force_vhost_queues_shrink_member_to_current_member(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1307,8 +1315,11 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts]. +<<<<<<< HEAD ======= >>>>>>> 10dbde1f71 (QQ tests for force-shrink to current member operations) +======= +>>>>>>> c9d97e61de (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1) priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority %% messages they are always consumed first (fifo) From ea60dae6c60364fc4b547b08d898778a773b5a0a Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 3 Oct 2024 10:57:11 +0100 Subject: [PATCH 5/5] Update QQ force-shrink logging (cherry picked from commit dd5ec3ccc0715ac47c2dc2f82191263bfc860204) (cherry picked from commit 16170d093bac1df4766ba36e17c50f281ba2466c) # Conflicts: # deps/rabbit/src/rabbit_quorum_queue.erl --- deps/rabbit/src/rabbit_quorum_queue.erl | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 2c764af71488..0020ae2efdb1 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1969,31 +1969,43 @@ force_shrink_member_to_current_member(VHost, Name) -> end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes], +<<<<<<< HEAD <<<<<<< HEAD rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]); ======= rabbit_log:warning("Disaster recovery procedure: shrinking finished"); >>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) +======= + rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]); +>>>>>>> 16170d093b (Update QQ force-shrink logging) _ -> rabbit_log:warning("Shrinking failed, ~ts not found", [QNameFmt]), {error, not_found} end. force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> +<<<<<<< HEAD <<<<<<< HEAD rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]), ======= rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues in vhost ~tp to a single node cluster", [VHost]), >>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1) +======= + rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]), +>>>>>>> 16170d093b (Update QQ force-shrink logging) ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, force_all_queues_shrink_member_to_current_member(ListQQs). force_all_queues_shrink_member_to_current_member() -> +<<<<<<< HEAD <<<<<<< HEAD rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]), ======= rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"), >>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1) +======= + rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]), +>>>>>>> 16170d093b (Update QQ force-shrink logging) ListQQs = fun() -> rabbit_amqqueue:list() end, force_all_queues_shrink_member_to_current_member(ListQQs). @@ -2003,11 +2015,15 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), OtherNodes = lists:delete(Node, get_nodes(Q)), +<<<<<<< HEAD <<<<<<< HEAD rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), ======= rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]), >>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) +======= + rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), +>>>>>>> 16170d093b (Update QQ force-shrink logging) ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), @@ -2020,6 +2036,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis <<<<<<< HEAD end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], rabbit_log:warning("Shrinking finished"), +<<<<<<< HEAD ======= end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE], ======= @@ -2027,6 +2044,8 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis >>>>>>> b03637f8ec (Implement force_vhost_queues_shrink_member_to_current_member/1) rabbit_log:warning("Disaster recovery procedure: shrinking finished"), >>>>>>> d9de6d989c (Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster) +======= +>>>>>>> 16170d093b (Update QQ force-shrink logging) ok. is_minority(All, Up) ->