Skip to content

Commit f239eaf

Browse files
committed
Publisher confirms test suite for all queue types
Some tests have been moved from another suites to here and made generic. New tests.
1 parent 9166955 commit f239eaf

File tree

3 files changed

+381
-103
lines changed

3 files changed

+381
-103
lines changed

test/quorum_queue_SUITE.erl

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,6 @@ all_tests() ->
114114
subscribe_and_nack,
115115
subscribe_and_multiple_nack,
116116
subscribe_should_fail_when_global_qos_true,
117-
publisher_confirms,
118-
publisher_confirms_with_deleted_queue,
119117
dead_letter_to_classic_queue,
120118
dead_letter_to_quorum_queue,
121119
dead_letter_from_classic_to_quorum_queue,
@@ -1123,41 +1121,6 @@ subscribe_and_multiple_nack(Config) ->
11231121
wait_for_messages_pending_ack(Servers, RaName, 0)
11241122
end.
11251123

1126-
publisher_confirms(Config) ->
1127-
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1128-
1129-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1130-
QQ = ?config(queue_name, Config),
1131-
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1132-
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1133-
1134-
RaName = ra_name(QQ),
1135-
amqp_channel:call(Ch, #'confirm.select'{}),
1136-
amqp_channel:register_confirm_handler(Ch, self()),
1137-
publish(Ch, QQ),
1138-
wait_for_messages_ready(Servers, RaName, 1),
1139-
wait_for_messages_pending_ack(Servers, RaName, 0),
1140-
ct:pal("WAIT FOR CONFIRMS ~n", []),
1141-
amqp_channel:wait_for_confirms(Ch, 5000),
1142-
amqp_channel:unregister_confirm_handler(Ch),
1143-
ok.
1144-
1145-
publisher_confirms_with_deleted_queue(Config) ->
1146-
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1147-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1148-
QQ = ?config(queue_name, Config),
1149-
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1150-
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1151-
amqp_channel:call(Ch, #'confirm.select'{}),
1152-
amqp_channel:register_confirm_handler(Ch, self()),
1153-
% subscribe(Ch, QQ, false),
1154-
publish(Ch, QQ),
1155-
delete_queues(Ch, [QQ]),
1156-
ct:pal("WAIT FOR CONFIRMS ~n", []),
1157-
amqp_channel:wait_for_confirms_or_die(Ch, 5000),
1158-
amqp_channel:unregister_confirm_handler(Ch),
1159-
ok.
1160-
11611124
dead_letter_to_classic_queue(Config) ->
11621125
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
11631126

test/unit_inbroker_parallel_SUITE.erl

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ groups() ->
4848
{parallel_tests, [parallel], [
4949
amqp_connection_refusal,
5050
configurable_server_properties,
51-
confirms,
5251
credit_flow_settings,
5352
dynamic_mirroring,
5453
gen_server2_with_state,
@@ -931,71 +930,6 @@ test_topic_expect_match(X, List) ->
931930
%% ---------------------------------------------------------------------------
932931
%% Unordered tests (originally from rabbit_tests.erl).
933932
%% ---------------------------------------------------------------------------
934-
935-
confirms(Config) ->
936-
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
937-
?MODULE, confirms1, [Config]).
938-
939-
confirms1(_Config) ->
940-
{_Writer, Ch} = test_spawn(),
941-
DeclareBindDurableQueue =
942-
fun() ->
943-
rabbit_channel:do(Ch, #'queue.declare'{durable = true}),
944-
receive #'queue.declare_ok'{queue = Q0} ->
945-
rabbit_channel:do(Ch, #'queue.bind'{
946-
queue = Q0,
947-
exchange = <<"amq.direct">>,
948-
routing_key = "confirms-magic" }),
949-
receive #'queue.bind_ok'{} -> Q0
950-
after ?TIMEOUT -> throw(failed_to_bind_queue)
951-
end
952-
after ?TIMEOUT -> throw(failed_to_declare_queue)
953-
end
954-
end,
955-
%% Declare and bind two queues
956-
QName1 = DeclareBindDurableQueue(),
957-
QName2 = DeclareBindDurableQueue(),
958-
%% Get the first one's pid (we'll crash it later)
959-
{ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
960-
QPid1 = amqqueue:get_pid(Q1),
961-
%% Enable confirms
962-
rabbit_channel:do(Ch, #'confirm.select'{}),
963-
receive
964-
#'confirm.select_ok'{} -> ok
965-
after ?TIMEOUT -> throw(failed_to_enable_confirms)
966-
end,
967-
%% Publish a message
968-
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
969-
routing_key = "confirms-magic"
970-
},
971-
rabbit_basic:build_content(
972-
#'P_basic'{delivery_mode = 2}, <<"">>)),
973-
%% We must not kill the queue before the channel has processed the
974-
%% 'publish'.
975-
ok = rabbit_channel:flush(Ch),
976-
%% Crash the queue
977-
QPid1 ! boom,
978-
%% Wait for a nack
979-
receive
980-
#'basic.nack'{} -> ok;
981-
#'basic.ack'{} -> throw(received_ack_instead_of_nack)
982-
after ?TIMEOUT-> throw(did_not_receive_nack)
983-
end,
984-
receive
985-
#'basic.ack'{} -> throw(received_ack_when_none_expected)
986-
after 1000 -> ok
987-
end,
988-
%% Cleanup
989-
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
990-
receive
991-
#'queue.delete_ok'{} -> ok
992-
after ?TIMEOUT -> throw(failed_to_cleanup_queue)
993-
end,
994-
unlink(Ch),
995-
ok = rabbit_channel:shutdown(Ch),
996-
997-
passed.
998-
999933
gen_server2_with_state(Config) ->
1000934
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1001935
?MODULE, gen_server2_with_state1, [Config]).

0 commit comments

Comments
 (0)