@@ -92,7 +92,9 @@ groups() ->
9292 format ,
9393 add_member_2 ,
9494 single_active_consumer_priority_take_over ,
95- single_active_consumer_priority
95+ single_active_consumer_priority ,
96+ force_shrink_member_to_current_member ,
97+ force_all_queues_shrink_member_to_current_member
9698 ]
9799 ++ all_tests ()},
98100 {cluster_size_5 , [], [start_queue ,
@@ -1152,6 +1154,85 @@ single_active_consumer_priority(Config) ->
11521154 rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
11531155 ok .
11541156
1157+ force_shrink_member_to_current_member (Config ) ->
1158+ [Server0 , Server1 , Server2 ] =
1159+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1160+
1161+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1162+ QQ = ? config (queue_name , Config ),
1163+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1164+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1165+
1166+ RaName = ra_name (QQ ),
1167+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1168+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1169+
1170+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1171+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1172+ ? assertEqual (3 , length (Nodes0 )),
1173+
1174+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1175+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1176+
1177+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1178+
1179+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1180+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1181+ ? assertEqual (1 , length (Nodes1 )),
1182+
1183+ % % grow queues back to all nodes
1184+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1185+
1186+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1187+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1188+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1189+ ? assertEqual (3 , length (Nodes2 )).
1190+
1191+ force_all_queues_shrink_member_to_current_member (Config ) ->
1192+ [Server0 , Server1 , Server2 ] =
1193+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1194+
1195+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1196+ QQ = ? config (queue_name , Config ),
1197+ AQ = ? config (alt_queue_name , Config ),
1198+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1199+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1200+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1201+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1202+
1203+ QQs = [QQ , AQ ],
1204+
1205+ [begin
1206+ RaName = ra_name (Q ),
1207+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1208+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1209+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1210+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1211+ ? assertEqual (3 , length (Nodes0 ))
1212+ end || Q <- QQs ],
1213+
1214+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1215+ force_all_queues_shrink_member_to_current_member , []),
1216+
1217+ [begin
1218+ RaName = ra_name (Q ),
1219+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1220+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1221+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1222+ ? assertEqual (1 , length (Nodes0 ))
1223+ end || Q <- QQs ],
1224+
1225+ % % grow queues back to all nodes
1226+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1227+
1228+ [begin
1229+ RaName = ra_name (Q ),
1230+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1231+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1232+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1233+ ? assertEqual (3 , length (Nodes0 ))
1234+ end || Q <- QQs ].
1235+
11551236priority_queue_fifo (Config ) ->
11561237 % % testing: if hi priority messages are published before lo priority
11571238 % % messages they are always consumed first (fifo)
0 commit comments