From 4ec2f21dbe3758291ab026ee17effbe3d06ed230 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 16 Oct 2025 14:31:42 +0200 Subject: [PATCH 1/4] Shovel: more common testcases --- .../test/amqp091_dynamic_SUITE.erl | 414 +-------- .../test/amqp091_local_dynamic_SUITE.erl | 778 +++++++++++++++++ .../test/amqp10_dynamic_SUITE.erl | 68 +- .../test/local_dynamic_SUITE.erl | 798 ------------------ .../test/shovel_dynamic_SUITE.erl | 241 +++++- .../test/shovel_test_utils.erl | 65 +- 6 files changed, 1104 insertions(+), 1260 deletions(-) create mode 100644 deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl diff --git a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl index 39044fefdd3..aee6a990c60 100644 --- a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl @@ -13,7 +13,10 @@ -import(rabbit_ct_helpers, [eventually/1]). -import(shovel_test_utils, [await_autodelete/2, invalid_param/2, invalid_param/3, - valid_param/2, valid_param/3]). + valid_param/2, valid_param/3, + with_amqp091_ch/2, amqp091_publish_expect/5, + amqp091_publish/4, amqp091_expect_empty/2, + amqp091_expect/3]). -compile(export_all). @@ -22,7 +25,6 @@ all() -> [ {group, core_tests}, - {group, core_tests_with_preclared_topology}, {group, quorum_queue_tests}, {group, stream_queue_tests} ]. @@ -35,23 +37,12 @@ groups() -> set_empty_properties_using_proplist, set_empty_properties_using_map, headers, - exchange, - missing_dest_exchange, restart, - change_definition, - autodelete, validation, security_validation, get_connection_name, - credit_flow, - missing_src_queue_with_src_predeclared, - missing_dest_queue_with_dest_predeclared + credit_flow ]}, - {core_tests_with_preclared_topology, [], [ - missing_src_queue_without_src_predeclared, - missing_dest_queue_without_dest_predeclared, - missing_src_and_dest_queue_with_false_src_and_dest_predeclared - ]}, {quorum_queue_tests, [], [ quorum_queues ]}, @@ -93,18 +84,9 @@ init_per_group(stream_queue_tests, Config) -> false -> Config; _ -> {skip, "stream queue tests are skipped in mixed mode"} end; -init_per_group(core_tests_with_preclared_topology, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, - [rabbitmq_shovel, topology, [{predeclared, true}]]), - Config; - init_per_group(_, Config) -> Config. -end_per_group(core_tests_with_preclared_topology, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, - [rabbitmq_shovel, topology]), - Config; end_per_group(_, Config) -> Config. @@ -112,13 +94,14 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, cleanup1, [Config]), rabbit_ct_helpers:testcase_finished(Config, Testcase). %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- quorum_queues(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> shovel_test_utils:set_param( Config, @@ -128,11 +111,11 @@ quorum_queues(Config) -> {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}, {<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}} ]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) end). stream_queues(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> shovel_test_utils:set_param( Config, @@ -142,57 +125,57 @@ stream_queues(Config) -> {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"stream">>}}, {<<"src-consumer-args">>, #{<<"x-stream-offset">> => <<"first">>}} ]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) end). set_properties_using_map(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> Ps = [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}, {<<"publish-properties">>, #{<<"cluster_id">> => <<"x">>}}], shovel_test_utils:set_param(Config, <<"test">>, Ps), #amqp_msg{props = #'P_basic'{cluster_id = Cluster}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>), + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>), <<"x">> = Cluster end). set_properties_using_proplist(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> Ps = [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}, {<<"publish-properties">>, [{<<"cluster_id">>, <<"x">>}]}], shovel_test_utils:set_param(Config, <<"test">>, Ps), #amqp_msg{props = #'P_basic'{cluster_id = Cluster}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>), + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>), <<"x">> = Cluster end). set_empty_properties_using_map(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> Ps = [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}, {<<"publish-properties">>, #{}}], shovel_test_utils:set_param(Config, <<"test">>, Ps), #amqp_msg{props = #'P_basic'{}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>) + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>) end). set_empty_properties_using_proplist(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> Ps = [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}, {<<"publish-properties">>, []}], shovel_test_utils:set_param(Config, <<"test">>, Ps), #amqp_msg{props = #'P_basic'{}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>) + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>) end). headers(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun(Ch) -> %% No headers by default shovel_test_utils:set_param(Config, @@ -201,7 +184,7 @@ headers(Config) -> {<<"dest-queue">>, <<"dest">>}]), ?assertMatch(#amqp_msg{props = #'P_basic'{headers = H0}} when H0 == undefined orelse H0 == [], - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi1">>)), + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi1">>)), shovel_test_utils:set_param(Config, <<"test">>, @@ -211,7 +194,7 @@ headers(Config) -> {<<"add-timestamp-header">>, true}]), Timestmp = os:system_time(seconds), #amqp_msg{props = #'P_basic'{headers = Headers}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi2">>), + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi2">>), [{<<"x-shovelled">>, _, [{table, ShovelledHeader}]}, {<<"x-shovelled-timestamp">>, long, TS}] = Headers, %% We assume that the message was shovelled within a 2 second @@ -231,7 +214,7 @@ headers(Config) -> {<<"add-timestamp-header">>, true}]), #amqp_msg{props = #'P_basic'{headers = [{<<"x-shovelled-timestamp">>, long, _}]}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi3">>), + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi3">>), shovel_test_utils:set_param(Config, <<"test">>, @@ -240,221 +223,12 @@ headers(Config) -> {<<"add-forward-headers">>, true}]), #amqp_msg{props = #'P_basic'{headers = [{<<"x-shovelled">>, _, _}]}} = - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi4">>) + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi4">>) end). -exchange(Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue">>, - durable = true}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"queue">>, - exchange = <<"amq.topic">>, - routing_key = <<"test-key">>}), - shovel_test_utils:set_param(Config, - <<"test">>, [{<<"src-exchange">>, <<"amq.direct">>}, - {<<"src-exchange-key">>,<<"test-key">>}, - {<<"dest-exchange">>, <<"amq.topic">>}]), - publish_expect(Ch, <<"amq.direct">>, <<"test-key">>, - <<"queue">>, <<"hello">>), - shovel_test_utils:set_param(Config, - <<"test">>, [{<<"src-exchange">>, <<"amq.direct">>}, - {<<"src-exchange-key">>, <<"test-key">>}, - {<<"dest-exchange">>, <<"amq.topic">>}, - {<<"dest-exchange-key">>,<<"new-key">>}]), - publish(Ch, <<"amq.direct">>, <<"test-key">>, <<"hello">>), - expect_empty(Ch, <<"queue">>), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"queue">>, - exchange = <<"amq.topic">>, - routing_key = <<"new-key">>}), - publish_expect(Ch, <<"amq.direct">>, <<"test-key">>, - <<"queue">>, <<"hello">>) - end). - -missing_src_queue_with_src_predeclared(Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call( - Ch, #'queue.declare'{queue = <<"dest">>, - durable = true}), - amqp_channel:call( - Ch, #'exchange.declare'{exchange = <<"dest-ex">>}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"dest">>, - exchange = <<"dest-ex">>, - routing_key = <<"dest-key">>}), - - shovel_test_utils:set_param_nowait(Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"src-predeclared">>, true}, - {<<"dest-exchange">>, <<"dest-ex">>}, - {<<"dest-exchange-key">>, <<"dest-key">>}, - {<<"src-prefetch-count">>, 1}]), - shovel_test_utils:await_shovel(Config, 0, <<"test">>, terminated), - expect_missing_queue(Ch, <<"src">>), - - with_newch(Config, - fun(Ch2) -> - amqp_channel:call( - Ch2, #'queue.declare'{queue = <<"src">>, - durable = true}), - amqp_channel:call( - Ch2, #'queue.bind'{queue = <<"src">>, - exchange = <<"amq.direct">>, - routing_key = <<"src-key">>}), - shovel_test_utils:await_shovel(Config, 0, <<"test">>, running), - - publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>) - end) - end). - - -missing_src_and_dest_queue_with_false_src_and_dest_predeclared(Config) -> - with_ch(Config, - fun (Ch) -> - - shovel_test_utils:set_param( - Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"src-predeclared">>, false}, - {<<"dest-predeclared">>, false}, - {<<"dest-queue">>, <<"dest">>}]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) - - end). - -missing_dest_queue_with_dest_predeclared(Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call( - Ch, #'queue.declare'{queue = <<"src">>, - durable = true}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"src">>, - exchange = <<"amq.direct">>, - routing_key = <<"src-key">>}), - - shovel_test_utils:set_param_nowait(Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, <<"dest">>}, - {<<"src-prefetch-count">>, 1}]), - shovel_test_utils:await_shovel(Config, 0, <<"test">>, terminated), - expect_missing_queue(Ch, <<"dest">>), - - with_newch(Config, - fun(Ch2) -> - amqp_channel:call( - Ch2, #'queue.declare'{queue = <<"dest">>, - durable = true}), - - shovel_test_utils:await_shovel(Config, 0, <<"test">>, running), - - publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>) - end) - end). - -missing_src_queue_without_src_predeclared(Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call( - Ch, #'queue.declare'{queue = <<"dest">>, - durable = true}), - amqp_channel:call( - Ch, #'exchange.declare'{exchange = <<"dest-ex">>}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"dest">>, - exchange = <<"dest-ex">>, - routing_key = <<"dest-key">>}), - - shovel_test_utils:set_param_nowait(Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-exchange">>, <<"dest-ex">>}, - {<<"dest-exchange-key">>, <<"dest-key">>}, - {<<"src-prefetch-count">>, 1}]), - shovel_test_utils:await_shovel(Config, 0, <<"test">>, terminated), - expect_missing_queue(Ch, <<"src">>), - - with_newch(Config, - fun(Ch2) -> - amqp_channel:call( - Ch2, #'queue.declare'{queue = <<"src">>, - durable = true}), - amqp_channel:call( - Ch2, #'queue.bind'{queue = <<"src">>, - exchange = <<"amq.direct">>, - routing_key = <<"src-key">>}), - shovel_test_utils:await_shovel(Config, 0, <<"test">>, running), - - publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>) - end) - end). - - -missing_dest_queue_without_dest_predeclared(Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call( - Ch, #'queue.declare'{queue = <<"src">>, - durable = true}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"src">>, - exchange = <<"amq.direct">>, - routing_key = <<"src-key">>}), - - shovel_test_utils:set_param_nowait(Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-queue">>, <<"dest">>}, - {<<"src-prefetch-count">>, 1}]), - shovel_test_utils:await_shovel(Config, 0, <<"test">>, terminated), - expect_missing_queue(Ch, <<"dest">>), - - with_newch(Config, - fun(Ch2) -> - amqp_channel:call( - Ch2, #'queue.declare'{queue = <<"dest">>, - durable = true}), - - shovel_test_utils:await_shovel(Config, 0, <<"test">>, running), - - publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>) - end) - end). - -missing_dest_exchange(Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call( - Ch, #'queue.declare'{queue = <<"src">>, - durable = true}), - amqp_channel:call( - Ch, #'queue.declare'{queue = <<"dest">>, - durable = true}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"src">>, - exchange = <<"amq.direct">>, - routing_key = <<"src-key">>}), - shovel_test_utils:set_param(Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-exchange">>, <<"dest-ex">>}, - {<<"dest-exchange-key">>, <<"dest-key">>}, - {<<"src-prefetch-count">>, 1}]), - publish(Ch, <<"amq.direct">>, <<"src-key">>, <<"hello">>), - expect_empty(Ch, <<"src">>), - amqp_channel:call( - Ch, #'exchange.declare'{exchange = <<"dest-ex">>}), - amqp_channel:call( - Ch, #'queue.bind'{queue = <<"dest">>, - exchange = <<"dest-ex">>, - routing_key = <<"dest-key">>}), - publish_expect(Ch, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>) -end). - restart(Config) -> - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> shovel_test_utils:set_param(Config, <<"test">>, [{<<"src-queue">>, <<"src">>}, @@ -465,57 +239,9 @@ restart(Config) -> Conns = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), [catch amqp_connection:close(C) || C <- Conns], - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) - end). - -change_definition(Config) -> - with_ch(Config, - fun (Ch) -> - shovel_test_utils:set_param( - Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-queue">>, <<"dest">>}]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>), - shovel_test_utils:set_param( - Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-queue">>, <<"dest2">>}]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest2">>, <<"hello">>), - expect_empty(Ch, <<"dest">>), - shovel_test_utils:clear_param(Config, <<"test">>), - publish_expect(Ch, <<>>, <<"src">>, <<"src">>, <<"hello">>), - expect_empty(Ch, <<"dest">>), - expect_empty(Ch, <<"dest2">>) + amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) end). -autodelete(Config) -> - autodelete_case(Config, {<<"on-confirm">>, <<"queue-length">>, 0, 100}), - autodelete_case(Config, {<<"on-publish">>, <<"queue-length">>, 0, 100}), - %% no-ack is not compatible with explicit count - autodelete_case(Config, {<<"no-ack">>, <<"queue-length">>, 0, 100}), - ok. - -autodelete_case(Config, Args) -> - with_ch(Config, autodelete_do(Config, Args)). - -autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> - fun (Ch) -> - amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}), - publish_count(Ch, <<>>, <<"src">>, <<"hello">>, 100), - amqp_channel:wait_for_confirms(Ch), - shovel_test_utils:set_param_nowait( - Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-queue">>, <<"dest">>}, - {<<"src-prefetch-count">>, 50}, - {<<"ack-mode">>, AckMode}, - {<<"src-delete-after">>, After}]), - await_autodelete(Config, <<"test">>), - expect_count(Ch, <<"dest">>, <<"hello">>, ExpDest), - expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc) - end. - validation(Config) -> URIs = [{<<"src-uri">>, <<"amqp://">>}, {<<"dest-uri">>, <<"amqp://">>}], @@ -636,7 +362,7 @@ get_connection_name(_Config) -> credit_flow(Config) -> OrigCredit = set_default_credit(Config, {20, 10}), - with_ch(Config, + with_amqp091_ch(Config, fun (Ch) -> try shovel_test_utils:set_param_nowait( @@ -726,96 +452,23 @@ credit_flow(Config) -> end). %%---------------------------------------------------------------------------- - -with_ch(Config, Fun) -> - {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Fun(Ch), - rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), - cleanup(Config), - ok. - -with_newch(Config, Fun) -> - {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Fun(Ch), - rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), - ok. - -publish(Ch, X, Key, Payload) when is_binary(Payload) -> - publish(Ch, X, Key, #amqp_msg{payload = Payload}); - -publish(Ch, X, Key, Msg = #amqp_msg{}) -> - amqp_channel:cast(Ch, #'basic.publish'{exchange = X, - routing_key = Key}, Msg). - -publish_expect(Ch, X, Key, Q, Payload) -> - publish(Ch, X, Key, Payload), - expect(Ch, Q, Payload). - -expect(Ch, Q, Payload) -> - amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, - no_ack = true}, self()), - CTag = receive - #'basic.consume_ok'{consumer_tag = CT} -> CT - end, - Msg = receive - {#'basic.deliver'{}, #amqp_msg{payload = Payload} = M} -> - M - after 4000 -> - exit({not_received, Payload}) - end, - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), - Msg. - -expect_empty(Ch, Q) -> - #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{ queue = Q }). - -expect_missing_queue(Ch, Q) -> - try - amqp_channel:call(Ch, #'queue.declare'{queue = Q, - passive = true}), - ct:fail(queue_still_exists) - catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> - ok - end. -expect_missing_exchange(Ch, X) -> - try - amqp_channel:call(Ch, #'exchange.declare'{exchange = X, - passive = true}), - ct:fail(exchange_still_exists) - catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> - ok - end. - publish_count(Ch, X, Key, M, Count) -> [begin - publish(Ch, X, Key, M) + amqp091_publish(Ch, X, Key, M) end || _ <- lists:seq(1, Count)]. expect_count(Ch, Q, M, Count) -> [begin - expect(Ch, Q, M) + amqp091_expect(Ch, Q, M) end || _ <- lists:seq(1, Count)], - expect_empty(Ch, Q). + amqp091_expect_empty(Ch, Q). lookup_user(Config, Name) -> {ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_access_control, check_user_login, [Name, []]), User. -cleanup(Config) -> - rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, cleanup1, [Config]). - -cleanup1(_Config) -> - [rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P), - rabbit_misc:pget(component, P), - rabbit_misc:pget(name, P), - <<"acting-user">>) || - P <- rabbit_runtime_parameters:list()], - [rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>) - || Q <- rabbit_amqqueue:list()]. - set_default_credit(Config, Value) -> Key = credit_flow_default_credit, OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]), @@ -890,3 +543,12 @@ process_name(Pid) -> proc_info(Pid, Item) -> {Item, Value} = erpc:call(node(Pid), erlang, process_info, [Pid, Item]), Value. + +cleanup1(_Config) -> + [rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P), + rabbit_misc:pget(component, P), + rabbit_misc:pget(name, P), + <<"acting-user">>) || + P <- rabbit_runtime_parameters:list()], + [rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>) + || Q <- rabbit_amqqueue:list()]. diff --git a/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl new file mode 100644 index 00000000000..244d3d13829 --- /dev/null +++ b/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl @@ -0,0 +1,778 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp091_local_dynamic_SUITE). +%% Common test cases to amqp091 and local protocols +%% Both protocols behave very similar, so we can mostly join their +%% test suites and ensure a better coverage + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +-compile(export_all). + +-import(rabbit_ct_helpers, [eventually/3]). +-import(shovel_test_utils, [await_autodelete/2, + set_param/3, + set_param_nowait/3, + with_amqp10_session/2, + with_amqp10_session/3, + amqp10_publish_expect/5, + amqp10_declare_queue/3, + amqp10_publish/4, + amqp10_expect_one/2, + amqp10_expect_count/3, + amqp10_expect_empty/2, + make_uri/3, + await_shovel/3, + await_shovel/4, + await_no_shovel/2, + with_amqp091_ch/2, + amqp091_publish_expect/5, + amqp091_publish/4, + amqp091_expect_empty/2, + amqp091_publish_expect/5 + ]). + +-define(PARAM, <<"test">>). + +all() -> + [ + {group, amqp091}, + {group, local}, + {group, amqp091_to_local}, + {group, local_to_amqp091} + ]. + +groups() -> + [ + {amqp091, [], tests()}, + {local, [], tests()}, + {amqp091_to_local, [], tests()}, + {local_to_amqp091, [], tests()} + ]. + +tests() -> + [ + original_dest, + exchange_dest, + exchange_to_exchange, + missing_exchange_dest, + missing_create_exchange_dest, + missing_src_queue_with_src_predeclared, + missing_dest_queue_with_dest_predeclared, + missing_src_queue_without_src_predeclared, + missing_dest_queue_without_dest_predeclared, + missing_src_and_dest_queue_with_false_src_and_dest_predeclared, + predeclared_classic_src, + predeclared_quorum_src, + predeclared_stream_first_offset_src, + predeclared_stream_last_offset_src, + missing_predeclared_src, + exchange_src, + queue_args_src, + queue_args_dest, + predeclared_classic_dest, + predeclared_quorum_dest, + missing_predeclared_dest, + exchange_status, + queue_and_exchange_src_fails, + queue_and_exchange_dest_fails, + delete_after_queue_length, + delete_after_queue_length_zero, + autodelete_classic_on_confirm_queue_length, + autodelete_quorum_on_confirm_queue_length, + autodelete_classic_on_publish_queue_length, + autodelete_quorum_on_publish_queue_length, + autodelete_classic_no_ack_queue_length, + autodelete_quorum_no_ack_queue_length + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- +init_per_suite(Config0) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config0, [ + {rmq_nodename_suffix, ?MODULE}, + {ignored_crashes, [ + "server_initiated_close,404", + "writer,send_failed,closed", + "source_queue_down", + "dest_queue_down", + "inbound_link_detached", + "not_found" + ]} + ]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + application:stop(amqp10_client), + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(amqp091, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"amqp091">>}, + {dest_protocol, <<"amqp091">>} + ]); +init_per_group(local, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"local">>}, + {dest_protocol, <<"local">>} + ]); +init_per_group(amqp091_to_local, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"amqp091">>}, + {dest_protocol, <<"local">>} + ]); +init_per_group(local_to_amqp091, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"local">>}, + {dest_protocol, <<"amqp091">>} + ]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config0) -> + SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"), + DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"), + VHost = list_to_binary(atom_to_list(Testcase) ++ "_vhost"), + ShovelArgs = [{<<"src-protocol">>, ?config(src_protocol, Config0)}, + {<<"dest-protocol">>, ?config(dest_protocol, Config0)}], + Config = rabbit_ct_helpers:set_config( + Config0, + [{srcq, SrcQ}, {destq, DestQ}, {shovel_args, ShovelArgs}, + {alt_vhost, VHost}]), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + shovel_test_utils:clear_param(Config, ?PARAM), + rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []), + _ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, + [rabbitmq_shovel, topology]), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- +original_dest(Config) -> + %% Publish with the original routing keys, but use a different vhost + %% to avoid a loop (this is a single-node test). + Src = ?config(srcq, Config), + Dest = Src, + AltVHost = ?config(alt_vhost, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, AltVHost), + with_amqp10_session(Config, AltVHost, + fun (Sess) -> + amqp10_declare_queue(Sess, Dest, #{}) + end), + with_amqp10_session( + Config, + fun (Sess) -> + SrcUri = make_uri(Config, 0, <<"%2F">>), + DestUri = make_uri(Config, 0, AltVHost), + ShovelArgs = [{<<"src-uri">>, SrcUri}, + {<<"dest-uri">>, [DestUri]}, + {<<"src-queue">>, Src}] + ++ ?config(shovel_args, Config), + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_runtime_parameters, set, + [<<"/">>, <<"shovel">>, ?PARAM, ShovelArgs, none]), + await_shovel(Config, 0, ?PARAM), + _ = amqp10_publish(Sess, Src, <<"hello">>, 1) + end), + with_amqp10_session(Config, AltVHost, + fun (Sess) -> + amqp10_expect_one(Sess, Dest) + end). + +exchange_dest(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + AltExchange = <<"alt-exchange">>, + RoutingKey = <<"funky-routing-key">>, + declare_exchange(Config, <<"/">>, AltExchange), + declare_and_bind_queue(Config, <<"/">>, AltExchange, Dest, RoutingKey), + with_amqp10_session( + Config, + fun (Sess) -> + set_param(Config, ?PARAM, + ?config(shovel_args, Config) ++ + [{<<"src-queue">>, Src}, + {<<"dest-exchange">>, AltExchange}, + {<<"dest-exchange-key">>, RoutingKey} + ]), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) + end). + +exchange_to_exchange(Config) -> + with_amqp091_ch(Config, + fun (Ch) -> + amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue">>, + durable = true}), + amqp_channel:call( + Ch, #'queue.bind'{queue = <<"queue">>, + exchange = <<"amq.topic">>, + routing_key = <<"test-key">>}), + set_param(Config, + ?PARAM, [{<<"src-exchange">>, <<"amq.direct">>}, + {<<"src-exchange-key">>,<<"test-key">>}, + {<<"dest-exchange">>, <<"amq.topic">>}]), + amqp091_publish_expect(Ch, <<"amq.direct">>, <<"test-key">>, + <<"queue">>, <<"hello">>), + set_param(Config, + ?PARAM, [{<<"src-exchange">>, <<"amq.direct">>}, + {<<"src-exchange-key">>, <<"test-key">>}, + {<<"dest-exchange">>, <<"amq.topic">>}, + {<<"dest-exchange-key">>,<<"new-key">>}]), + amqp091_publish(Ch, <<"amq.direct">>, <<"test-key">>, <<"hello">>), + amqp091_expect_empty(Ch, <<"queue">>), + amqp_channel:call( + Ch, #'queue.bind'{queue = <<"queue">>, + exchange = <<"amq.topic">>, + routing_key = <<"new-key">>}), + amqp091_publish_expect(Ch, <<"amq.direct">>, <<"test-key">>, + <<"queue">>, <<"hello">>) + end). + +missing_exchange_dest(Config) -> + Src = ?config(srcq, Config), + AltExchange = <<"alt-exchange">>, + RoutingKey = <<"funky-routing-key">>, + %% If the destination exchange doesn't exist, it succeeds to start + %% the shovel. Just messages will not be routed + shovel_test_utils:set_param(Config, ?PARAM, + ?config(shovel_args, Config) ++ + [{<<"src-queue">>, Src}, + {<<"dest-exchange">>, AltExchange}, + {<<"dest-exchange-key">>, RoutingKey} + ]). + +missing_create_exchange_dest(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp091_ch( + Config, + fun (Ch) -> + amqp_channel:call( + Ch, #'queue.declare'{queue = Src, + durable = true}), + amqp_channel:call( + Ch, #'queue.declare'{queue = Dest, + durable = true}), + amqp_channel:call( + Ch, #'queue.bind'{queue = Src, + exchange = <<"amq.direct">>, + routing_key = <<"src-key">>}), + set_param(Config, + ?PARAM, ?config(shovel_args, Config) ++ + [{<<"src-queue">>, Src}, + {<<"dest-exchange">>, <<"dest-ex">>}, + {<<"dest-exchange-key">>, <<"dest-key">>}]), + amqp091_publish(Ch, <<"amq.direct">>, <<"src-key">>, <<"hello">>), + amqp091_expect_empty(Ch, Src), + amqp_channel:call( + Ch, #'exchange.declare'{exchange = <<"dest-ex">>}), + amqp_channel:call( + Ch, #'queue.bind'{queue = Dest, + exchange = <<"dest-ex">>, + routing_key = <<"dest-key">>}), + amqp091_publish_expect(Ch, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>) +end). + +missing_src_queue_with_src_predeclared(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp091_ch( + Config, + fun (Ch) -> + amqp_channel:call( + Ch, #'queue.declare'{queue = Dest, + durable = true}), + amqp_channel:call( + Ch, #'exchange.declare'{exchange = <<"dest-ex">>}), + amqp_channel:call( + Ch, #'queue.bind'{queue = Dest, + exchange = <<"dest-ex">>, + routing_key = <<"dest-key">>}), + + set_param_nowait(Config, + ?PARAM, ?config(shovel_args, Config) ++ + [{<<"src-queue">>, Src}, + {<<"src-predeclared">>, true}, + {<<"dest-exchange">>, <<"dest-ex">>}, + {<<"dest-exchange-key">>, <<"dest-key">>}]), + await_shovel(Config, 0, ?PARAM, terminated), + expect_missing_queue(Ch, Src), + + with_amqp091_ch( + Config, + fun(Ch2) -> + amqp_channel:call( + Ch2, #'queue.declare'{queue = Src, + durable = true}), + amqp_channel:call( + Ch2, #'queue.bind'{queue = Src, + exchange = <<"amq.direct">>, + routing_key = <<"src-key">>}), + await_shovel(Config, 0, ?PARAM, running), + amqp091_publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>) + end) + end). + +missing_dest_queue_with_dest_predeclared(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp091_ch( + Config, + fun (Ch) -> + amqp_channel:call( + Ch, #'queue.declare'{queue = Src, + durable = true}), + amqp_channel:call( + Ch, #'queue.bind'{queue = Src, + exchange = <<"amq.direct">>, + routing_key = <<"src-key">>}), + + set_param_nowait(Config, + ?PARAM, shovel_queue_args(Config) ++ + [{<<"dest-predeclared">>, true}]), + await_shovel(Config, 0, ?PARAM, terminated), + expect_missing_queue(Ch, Dest), + + with_amqp091_ch( + Config, + fun(Ch2) -> + amqp_channel:call( + Ch2, #'queue.declare'{queue = Dest, + durable = true}), + await_shovel(Config, 0, ?PARAM, running), + amqp091_publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>) + end) + end). + +missing_src_queue_without_src_predeclared(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbitmq_shovel, topology, [{predeclared, true}]]), + with_amqp091_ch( + Config, + fun (Ch) -> + amqp_channel:call( + Ch, #'queue.declare'{queue = Dest, + durable = true}), + amqp_channel:call( + Ch, #'exchange.declare'{exchange = <<"dest-ex">>}), + amqp_channel:call( + Ch, #'queue.bind'{queue = Dest, + exchange = <<"dest-ex">>, + routing_key = <<"dest-key">>}), + + set_param_nowait(Config, ?PARAM, + ?config(shovel_args, Config) ++ + [{<<"src-queue">>, Src}, + {<<"dest-exchange">>, <<"dest-ex">>}, + {<<"dest-exchange-key">>, <<"dest-key">>}]), + await_shovel(Config, 0, ?PARAM, terminated), + expect_missing_queue(Ch, Src), + + with_amqp091_ch( + Config, + fun(Ch2) -> + amqp_channel:call( + Ch2, #'queue.declare'{queue = Src, + durable = true}), + amqp_channel:call( + Ch2, #'queue.bind'{queue = Src, + exchange = <<"amq.direct">>, + routing_key = <<"src-key">>}), + await_shovel(Config, 0, ?PARAM, running), + + amqp091_publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>) + end) + end). + + +missing_dest_queue_without_dest_predeclared(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbitmq_shovel, topology, [{predeclared, true}]]), + with_amqp091_ch( + Config, + fun (Ch) -> + amqp_channel:call( + Ch, #'queue.declare'{queue = Src, + durable = true}), + amqp_channel:call( + Ch, #'queue.bind'{queue = Src, + exchange = <<"amq.direct">>, + routing_key = <<"src-key">>}), + + set_param_nowait(Config, ?PARAM, + shovel_queue_args(Config)), + await_shovel(Config, 0, ?PARAM, terminated), + expect_missing_queue(Ch, Dest), + + with_amqp091_ch( + Config, + fun(Ch2) -> + amqp_channel:call( + Ch2, #'queue.declare'{queue = Dest, + durable = true}), + await_shovel(Config, 0, ?PARAM, running), + amqp091_publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>) + end) + end). + +missing_src_and_dest_queue_with_false_src_and_dest_predeclared(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbitmq_shovel, topology, [{predeclared, true}]]), + with_amqp10_session( + Config, + fun(Sess) -> + shovel_test_utils:set_param( + Config, ?PARAM, shovel_queue_args(Config) ++ + [{<<"src-predeclared">>, false}, + {<<"dest-predeclared">>, false}]), + amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) + end). + +predeclared_classic_src(Config) -> + predeclared_src(Config, <<"classic">>). + +predeclared_quorum_src(Config) -> + predeclared_src(Config, <<"quorum">>). + +predeclared_src(Config, Type) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}), + set_param(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-predeclared">>, true} + ]), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) + end). + +predeclared_stream_first_offset_src(Config) -> + predeclared_stream_offset_src(Config, <<"first">>, 20). + +predeclared_stream_last_offset_src(Config) -> + predeclared_stream_offset_src(Config, <<"last">>, 1). + +predeclared_stream_offset_src(Config, Offset, ExpectedMsgs) -> + %% TODO test this in static + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, <<"stream">>}}), + amqp10_publish(Sess, Src, <<"tag1">>, 20), + set_param(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-predeclared">>, true}, + {<<"src-consumer-args">>, #{<<"x-stream-offset">> => Offset}} + ]), + amqp10_expect_count(Sess, Dest, ExpectedMsgs), + amqp10_expect_empty(Sess, Dest), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) + end). + +missing_predeclared_src(Config) -> + set_param_nowait(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-predeclared">>, true}]), + await_no_shovel(Config, ?PARAM), + %% The shovel parameter is only deleted when 'delete-after' + %% is used. In any other failure, the shovel should + %% remain and try to restart + ?assertNotMatch( + not_found, + rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_runtime_parameters, lookup, + [<<"/">>, <<"shovel">>, ?PARAM])). + +exchange_src(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + set_param(Config, ?PARAM, + ?config(shovel_args, Config) ++ + [{<<"src-exchange">>, <<"amq.direct">>}, + {<<"src-exchange-key">>, Src}, + {<<"dest-queue">>, Dest} + ]), + Target = <<"/exchange/amq.direct/", Src/binary>>, + _ = amqp10_publish_expect(Sess, Target, Dest, <<"hello">>, 1) + end). + +queue_args_src(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + shovel_test_utils:set_param( + Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}]), + Expected = lists:sort([[Src, <<"quorum">>], [Dest, <<"classic">>]]), + ?assertMatch(Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "type", "--no-table-headers"]))). + +queue_args_dest(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + shovel_test_utils:set_param( + Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}]), + Expected = lists:sort([[Dest, <<"quorum">>], [Src, <<"classic">>]]), + ?assertMatch(Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "type", "--no-table-headers"]))). + +predeclared_classic_dest(Config) -> + predeclared_dest(Config, <<"classic">>). + +predeclared_quorum_dest(Config) -> + predeclared_dest(Config, <<"quorum">>). + +predeclared_dest(Config, Type) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}), + set_param(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"dest-predeclared">>, true}]), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) + end). + +missing_predeclared_dest(Config) -> + set_param_nowait( + Config, ?PARAM, shovel_queue_args(Config) ++ + [{<<"dest-predeclared">>, true}]), + await_no_shovel(Config, ?PARAM), + %% The shovel parameter is only deleted when 'delete-after' + %% is used. In any other failure, the shovel should + %% remain and try to restart + ?assertNotMatch( + not_found, + rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_runtime_parameters, lookup, + [<<"/">>, <<"shovel">>, ?PARAM])). + +exchange_status(Config) -> + DefExchange = <<"amq.direct">>, + RK1 = <<"carrots">>, + AltExchange = <<"amq.fanout">>, + RK2 = <<"bunnies">>, + SrcProtocol = ?config(src_protocol, Config), + DestProtocol = ?config(dest_protocol, Config), + set_param(Config, ?PARAM, + ?config(shovel_args, Config) ++ + [{<<"src-exchange">>, DefExchange}, + {<<"src-exchange-key">>, RK1}, + {<<"dest-exchange">>, AltExchange}, + {<<"dest-exchange-key">>, RK2} + ]), + Status = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_shovel_status, status, []), + ?assertMatch([{_, dynamic, {running, _}, _, _}], Status), + [{_, dynamic, {running, Info}, _, _}] = Status, + ?assertMatch(SrcProtocol, proplists:get_value(src_protocol, Info)), + ?assertMatch(DestProtocol, proplists:get_value(dest_protocol, Info)), + ?assertMatch(undefined, proplists:get_value(src_queue, Info, undefined)), + ?assertMatch(DefExchange, proplists:get_value(src_exchange, Info)), + ?assertMatch(RK1, proplists:get_value(src_exchange_key, Info)), + ?assertMatch(AltExchange, proplists:get_value(dest_exchange, Info)), + ?assertMatch(RK2, proplists:get_value(dest_exchange_key, Info)), + ok. + +queue_and_exchange_src_fails(Config) -> + %% Setting both queue and exchange for source fails + try + set_param(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-exchange">>, <<"amq.direct">>}, + {<<"src-exchange-key">>, <<"bunnies">>} + ]), + throw(unexpected_success) + catch + _:{badmatch, {error_string, Reason}} -> + ?assertMatch(match, re:run(Reason, "Validation failed", [{capture, none}])) + end. + +queue_and_exchange_dest_fails(Config) -> + %% Setting both queue and exchange for dest fails + try + set_param(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"dest-exchange">>, <<"amq.direct">>}, + {<<"dest-exchange-key">>, <<"bunnies">>} + ]), + throw(unexpected_success) + catch + _:{badmatch, {error_string, Reason}} -> + ?assertMatch(match, re:run(Reason, "Validation failed", [{capture, none}])) + end. + +delete_after_queue_length_zero(Config) -> + Src = ?config(srcq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{}), + set_param_nowait(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-predeclared">>, true}, + {<<"src-delete-after">>, <<"queue-length">>} + ]), + await_no_shovel(Config, ?PARAM), + %% The shovel parameter is only deleted when 'delete-after' + %% is used. In any other failure, the shovel should + %% remain and try to restart + ?assertMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM])) + end). + +delete_after_queue_length(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{}), + amqp10_publish(Sess, Src, <<"tag1">>, 18), + set_param_nowait(Config, ?PARAM, + shovel_queue_args(Config) ++ + [{<<"src-predeclared">>, true}, + {<<"src-delete-after">>, <<"queue-length">>} + ]), + %% The shovel parameter is only deleted when 'delete-after' + %% is used. In any other failure, the shovel should + %% remain and try to restart + amqp10_expect_count(Sess, Dest, 18), + await_autodelete(Config, ?PARAM), + amqp10_publish(Sess, Src, <<"tag1">>, 5), + amqp10_expect_empty(Sess, Dest) + end). + + +autodelete_classic_on_confirm_queue_length(Config) -> + autodelete(Config, <<"classic">>, <<"on-confirm">>). + +autodelete_quorum_on_confirm_queue_length(Config) -> + autodelete(Config, <<"quorum">>, <<"on-confirm">>). + +autodelete_classic_on_publish_queue_length(Config) -> + autodelete(Config, <<"classic">>, <<"on-publish">>). + +autodelete_quorum_on_publish_queue_length(Config) -> + autodelete(Config, <<"quorum">>, <<"on-publish">>). + +autodelete_classic_no_ack_queue_length(Config) -> + autodelete(Config, <<"classic">>, <<"no-ack">>). + +autodelete_quorum_no_ack_queue_length(Config) -> + autodelete(Config, <<"quorum">>, <<"no-ack">>). + +autodelete(Config, Type, AckMode) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_publish(Sess, Src, <<"hello">>, 100), + Expected0 = lists:sort([[Dest, Type, <<"0">>], [Src, Type, <<"100">>]]), + ?awaitMatch(Expected0, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "type", "messages", "--no-table-headers"])), + 30_000), + ExtraArgs = [{<<"ack-mode">>, AckMode}, + {<<"src-delete-after">>, <<"queue-length">>}], + ShovelArgs = shovel_queue_args(Config) ++ ExtraArgs, + set_param_nowait(Config, ?PARAM, ShovelArgs), + await_autodelete(Config, ?PARAM), + amqp10_expect_count(Sess, Dest, 100) + end). + +%%---------------------------------------------------------------------------- +shovel_queue_args(Config) -> + ?config(shovel_args, Config) ++ + [{<<"src-queue">>, ?config(srcq, Config)}, + {<<"dest-queue">>, ?config(destq, Config)}]. + +declare_and_bind_queue(Config, VHost, Exchange, QName, RoutingKey) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + ?assertEqual( + {'queue.declare_ok', QName, 0, 0}, + amqp_channel:call( + Ch, #'queue.declare'{queue = QName, durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]})), + ?assertMatch( + #'queue.bind_ok'{}, + amqp_channel:call(Ch, #'queue.bind'{ + queue = QName, + exchange = Exchange, + routing_key = RoutingKey + })), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn). + +declare_exchange(Config, VHost, Exchange) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + ?assertMatch( + #'exchange.declare_ok'{}, + amqp_channel:call(Ch, #'exchange.declare'{exchange = Exchange})), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn). + +expect_missing_queue(Ch, Q) -> + try + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + passive = true}), + ct:fail(queue_still_exists) + catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> + ok + end. diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index 5fdacaf4102..26c341fc3e2 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -34,15 +34,12 @@ groups() -> simple, change_definition, simple_amqp10_dest, - simple_amqp10_src, amqp091_to_amqp10_with_dead_lettering, - amqp10_to_amqp091_application_properties, test_amqp10_delete_after_queue_length ]}, {with_map_config, [], [ simple, - simple_amqp10_dest, - simple_amqp10_src + simple_amqp10_dest ]} ]. @@ -177,69 +174,6 @@ test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) -> <<"x-opt-shovelled-timestamp">> := _ }, Anns). -simple_amqp10_src(Config) -> - MapConfig = ?config(map_config, Config), - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param( - Config, - ?PARAM, [{<<"src-protocol">>, <<"amqp10">>}, - {<<"src-address">>, Src}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"dest-queue">>, Dest}, - {<<"add-forward-headers">>, true}, - {<<"dest-add-timestamp-header">>, true}, - {<<"publish-properties">>, - case MapConfig of - true -> #{<<"cluster_id">> => <<"x">>}; - _ -> [{<<"cluster_id">>, <<"x">>}] - end} - ]), - _Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), - % the fidelity loss is quite high when consuming using the amqp10 - % plugin. For example custom headers aren't current translated. - % This isn't due to the shovel though. - ok - end). - -amqp10_to_amqp091_application_properties(Config) -> - MapConfig = ?config(map_config, Config), - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param( - Config, - ?PARAM, [{<<"src-protocol">>, <<"amqp10">>}, - {<<"src-address">>, Src}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"dest-queue">>, Dest}, - {<<"add-forward-headers">>, true}, - {<<"dest-add-timestamp-header">>, true}, - {<<"publish-properties">>, - case MapConfig of - true -> #{<<"cluster_id">> => <<"x">>}; - _ -> [{<<"cluster_id">>, <<"x">>}] - end} - ]), - - MsgSent = amqp10_msg:set_application_properties( - #{<<"key">> => <<"value">>}, - amqp10_msg:set_headers( - #{durable => true}, - amqp10_msg:new(<<"tag1">>, <<"hello">>, false))), - - Msg = publish_expect_msg(Sess, Src, Dest, MsgSent), - AppProps = amqp10_msg:application_properties(Msg), - ct:pal("MSG ~p", [Msg]), - - ?assertMatch(#{<<"key">> := <<"value">>}, - AppProps), - ok - end). - change_definition(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 1382bdafad0..743ffc83e34 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -32,53 +32,8 @@ groups() -> [ {tests, [], [ local_to_local_opt_headers, - local_to_local_original_dest, - local_to_local_exchange_dest, - local_to_local_missing_exchange_dest, - local_to_local_predeclared_src, - local_to_local_predeclared_quorum_src, - local_to_local_predeclared_stream_first_offset_src, - local_to_local_predeclared_stream_last_offset_src, - local_to_local_missing_predeclared_src, - local_to_local_exchange_src, - local_to_local_queue_args_src, - local_to_local_queue_args_dest, - local_to_local_predeclared_dest, - local_to_local_predeclared_quorum_dest, - local_to_local_missing_predeclared_dest, - local_to_local_queue_status, - local_to_local_exchange_status, - local_to_local_queue_and_exchange_src_fails, - local_to_local_queue_and_exchange_dest_fails, - local_to_local_delete_after_never, - local_to_local_delete_after_queue_length, - local_to_local_delete_after_queue_length_zero, - local_to_local_no_ack, - local_to_local_quorum_no_ack, local_to_local_stream_no_ack, - local_to_local_on_publish, - local_to_local_quorum_on_publish, - local_to_local_stream_on_publish, - local_to_local_on_confirm, - local_to_local_quorum_on_confirm, - local_to_local_stream_on_confirm, - local_to_local_reject_publish, - local_to_amqp091, - local_to_amqp10, - amqp091_to_local, - amqp10_to_local, - local_to_local_delete_src_queue, local_to_local_delete_dest_queue, - local_to_local_vhost_access, - local_to_local_user_access, - local_to_local_credit_flow_on_confirm, - local_to_local_credit_flow_on_publish, - local_to_local_credit_flow_no_ack, - local_to_local_quorum_credit_flow_on_confirm, - local_to_local_quorum_credit_flow_on_publish, - local_to_local_quorum_credit_flow_no_ack, - local_to_local_stream_credit_flow_on_confirm, - local_to_local_stream_credit_flow_on_publish, local_to_local_stream_credit_flow_no_ack, local_to_local_simple_uri, local_to_local_counters @@ -159,440 +114,6 @@ local_to_local_opt_headers(Config) -> amqp10_msg:message_annotations(Msg)) end). -local_to_local_original_dest(Config) -> - %% Publish with the original routing keys, but use a different vhost - %% to avoid a loop (this is a single-node test). - Src = ?config(srcq, Config), - Dest = Src, - AltVHost = ?config(alt_vhost, Config), - ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost), - ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, AltVHost), - declare_queue(Config, AltVHost, Dest), - with_amqp10_session( - Config, - fun (Sess) -> - SrcUri = shovel_test_utils:make_uri(Config, 0, <<"%2F">>), - DestUri = shovel_test_utils:make_uri(Config, 0, AltVHost), - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_runtime_parameters, set, - [<<"/">>, <<"shovel">>, ?PARAM, [{<<"src-uri">>, SrcUri}, - {<<"dest-uri">>, [DestUri]}, - {<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}], - none]), - shovel_test_utils:await_shovel(Config, 0, ?PARAM), - _ = amqp10_publish(Sess, Src, <<"hello">>, 1) - end), - with_amqp10_session(Config, AltVHost, - fun (Sess) -> - amqp10_expect_one(Sess, Dest) - end). - -local_to_local_exchange_dest(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - AltExchange = <<"alt-exchange">>, - RoutingKey = <<"funky-routing-key">>, - declare_exchange(Config, <<"/">>, AltExchange), - declare_and_bind_queue(Config, <<"/">>, AltExchange, Dest, RoutingKey), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-exchange">>, AltExchange}, - {<<"dest-exchange-key">>, RoutingKey} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_missing_exchange_dest(Config) -> - Src = ?config(srcq, Config), - AltExchange = <<"alt-exchange">>, - RoutingKey = <<"funky-routing-key">>, - %% If the destination exchange doesn't exist, it succeeds to start - %% the shovel. Just messages will not be routed - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-exchange">>, AltExchange}, - {<<"dest-exchange-key">>, RoutingKey} - ]). - -local_to_local_predeclared_src(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Src), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_predeclared_quorum_src(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_predeclared_stream_first_offset_src(Config) -> - %% TODO test this in static - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_amqp10_session(Config, - fun (Sess) -> - amqp10_publish(Sess, Src, <<"tag1">>, 20), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-predeclared">>, true}, - {<<"src-consumer-args">>, #{<<"x-stream-offset">> => <<"first">>}}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - amqp10_expect_count(Sess, Dest, 20), - amqp10_expect_empty(Sess, Dest), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_predeclared_stream_last_offset_src(Config) -> - %% TODO test this in static - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_amqp10_session(Config, - fun (Sess) -> - amqp10_publish(Sess, Src, <<"tag1">>, 20), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-predeclared">>, true}, - {<<"src-consumer-args">>, #{<<"x-stream-offset">> => <<"last">>}}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - %% Deliver last - amqp10_expect_count(Sess, Dest, 1), - amqp10_expect_empty(Sess, Dest), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_missing_predeclared_src(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - shovel_test_utils:set_param_nowait(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - shovel_test_utils:await_no_shovel(Config, ?PARAM), - %% The shovel parameter is only deleted when 'delete-after' - %% is used. In any other failure, the shovel should - %% remain and try to restart - ?assertNotMatch( - not_found, - rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_runtime_parameters, lookup, - [<<"/">>, <<"shovel">>, ?PARAM])). - -local_to_local_exchange_src(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-exchange">>, <<"amq.direct">>}, - {<<"src-exchange-key">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - Target = <<"/exchange/amq.direct/", Src/binary>>, - _ = amqp10_publish_expect(Sess, Target, Dest, <<"hello">>, 1) - end). - -local_to_local_queue_args_src(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - Expected = lists:sort([[Src, <<"quorum">>], [Dest, <<"classic">>]]), - ?assertMatch(Expected, - lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, - ["list_queues", "name", "type", "--no-table-headers"]))). - -local_to_local_queue_args_dest(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}} - ]), - Expected = lists:sort([[Dest, <<"quorum">>], [Src, <<"classic">>]]), - ?assertMatch(Expected, - lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, - ["list_queues", "name", "type", "--no-table-headers"]))). - -local_to_local_predeclared_dest(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Dest), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_predeclared_quorum_dest(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_missing_predeclared_dest(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - shovel_test_utils:set_param_nowait( - Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - shovel_test_utils:await_no_shovel(Config, ?PARAM), - %% The shovel parameter is only deleted when 'delete-after' - %% is used. In any other failure, the shovel should - %% remain and try to restart - ?assertNotMatch( - not_found, - rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_runtime_parameters, lookup, - [<<"/">>, <<"shovel">>, ?PARAM])). - -local_to_local_queue_status(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - Status = rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - ?assertMatch([{_, dynamic, {running, _}, _, _}], Status), - [{_, dynamic, {running, Info}, _, _}] = Status, - ?assertMatch(<<"local">>, proplists:get_value(src_protocol, Info)), - ?assertMatch(<<"local">>, proplists:get_value(dest_protocol, Info)), - ?assertMatch(Src, proplists:get_value(src_queue, Info)), - ?assertMatch(Dest, proplists:get_value(dest_queue, Info)), - ok. - -local_to_local_exchange_status(Config) -> - DefExchange = <<"amq.direct">>, - RK1 = <<"carrots">>, - AltExchange = <<"amq.fanout">>, - RK2 = <<"bunnies">>, - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-exchange">>, DefExchange}, - {<<"src-exchange-key">>, RK1}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-exchange">>, AltExchange}, - {<<"dest-exchange-key">>, RK2} - ]), - Status = rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - ?assertMatch([{_, dynamic, {running, _}, _, _}], Status), - [{_, dynamic, {running, Info}, _, _}] = Status, - ?assertMatch(<<"local">>, proplists:get_value(src_protocol, Info)), - ?assertMatch(<<"local">>, proplists:get_value(dest_protocol, Info)), - ?assertMatch(match, re:run(proplists:get_value(src_queue, Info), - "amq\.gen.*", [{capture, none}])), - ?assertMatch(DefExchange, proplists:get_value(src_exchange, Info)), - ?assertMatch(RK1, proplists:get_value(src_exchange_key, Info)), - ?assertMatch(AltExchange, proplists:get_value(dest_exchange, Info)), - ?assertMatch(RK2, proplists:get_value(dest_exchange_key, Info)), - ok. - -local_to_local_queue_and_exchange_src_fails(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - %% Setting both queue and exchange for source fails - try - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-exchange">>, <<"amq.direct">>}, - {<<"src-exchange-key">>, <<"bunnies">>}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - throw(unexpected_success) - catch - _:{badmatch, {error_string, Reason}} -> - ?assertMatch(match, re:run(Reason, "Validation failed", [{capture, none}])) - end. - -local_to_local_queue_and_exchange_dest_fails(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - %% Setting both queue and exchange for dest fails - try - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"dest-exchange">>, <<"amq.direct">>}, - {<<"dest-exchange-key">>, <<"bunnies">>} - ]), - throw(unexpected_success) - catch - _:{badmatch, {error_string, Reason}} -> - ?assertMatch(match, re:run(Reason, "Validation failed", [{capture, none}])) - end. - -local_to_local_delete_after_never(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - amqp10_publish(Sess, Src, <<"tag1">>, 20), - amqp10_expect_count(Sess, Dest, 20) - end). - -local_to_local_delete_after_queue_length_zero(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Src), - shovel_test_utils:set_param_nowait(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"src-delete-after">>, <<"queue-length">>}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - shovel_test_utils:await_no_shovel(Config, ?PARAM), - %% The shovel parameter is only deleted when 'delete-after' - %% is used. In any other failure, the shovel should - %% remain and try to restart - ?assertMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM])). - -local_to_local_delete_after_queue_length(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Src), - with_amqp10_session(Config, - fun (Sess) -> - amqp10_publish(Sess, Src, <<"tag1">>, 18), - shovel_test_utils:set_param_nowait(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"src-delete-after">>, <<"queue-length">>}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - %% The shovel parameter is only deleted when 'delete-after' - %% is used. In any other failure, the shovel should - %% remain and try to restart - amqp10_expect_count(Sess, Dest, 18), - await_autodelete(Config, ?PARAM), - amqp10_publish(Sess, Src, <<"tag1">>, 5), - amqp10_expect_empty(Sess, Dest) - end). - -local_to_local_no_ack(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"no-ack">>} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_quorum_no_ack(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"no-ack">>} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - local_to_local_stream_no_ack(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), @@ -619,231 +140,6 @@ local_to_local_stream_no_ack(Config) -> amqp10_client:detach_link(Receiver) end). -local_to_local_on_confirm(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-confirm">>} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_quorum_on_confirm(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-confirm">>} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_stream_on_confirm(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-confirm">>} - ]), - Receiver = amqp10_subscribe(Sess, Dest), - amqp10_publish(Sess, Src, <<"tag1">>, 10), - ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], - rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - 30000), - _ = amqp10_expect(Receiver, 10, []), - amqp10_client:detach_link(Receiver) - end). - -local_to_local_on_publish(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-publish">>} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_quorum_on_publish(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-publish">>} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_stream_on_publish(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-predeclared">>, true}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-publish">>} - ]), - Receiver = amqp10_subscribe(Sess, Dest), - amqp10_publish(Sess, Src, <<"tag1">>, 10), - ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], - rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - 30000), - _ = amqp10_expect(Receiver, 10, []), - amqp10_client:detach_link(Receiver) - end). - -local_to_local_reject_publish(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - declare_queue(Config, <<"/">>, Dest, [{<<"x-max-length">>, long, 1}, - {<<"x-overflow">>, longstr, <<"reject-publish">>} - ]), - with_amqp10_session( - Config, - fun (Sess) -> - amqp10_publish(Sess, Src, <<"tag1">>, 5), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, <<"on-confirm">>} - ]), - amqp10_expect_count(Sess, Dest, 1) - end). - -local_to_amqp091(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_amqp10(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"amqp10">>}, - {<<"dest-address">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -amqp091_to_local(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"amqp091">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -amqp10_to_local(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"amqp10">>}, - {<<"src-address">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) - end). - -local_to_local_delete_src_queue(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), - ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}], - rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - 30000), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queue, - [Src, <<"/">>]), - ?awaitMatch([{_Name, dynamic, {terminated,source_queue_down}, _, _}], - rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - 30000) - end). - local_to_local_delete_dest_queue(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), @@ -868,100 +164,6 @@ local_to_local_delete_dest_queue(Config) -> 30000) end). -local_to_local_vhost_access(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - AltVHost = ?config(alt_vhost, Config), - ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost), - Uri = shovel_test_utils:make_uri(Config, 0, AltVHost), - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_runtime_parameters, set, - [<<"/">>, <<"shovel">>, ?PARAM, [{<<"src-uri">>, Uri}, - {<<"dest-uri">>, [Uri]}, - {<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}], - none]), - shovel_test_utils:await_no_shovel(Config, ?PARAM). - -local_to_local_user_access(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - Uri = shovel_test_utils:make_uri( - Config, 0, <<"guest">>, <<"forgotmypassword">>, <<"%2F">>), - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_runtime_parameters, set, - [<<"/">>, <<"shovel">>, ?PARAM, [{<<"src-uri">>, Uri}, - {<<"dest-uri">>, [Uri]}, - {<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}], - none]), - shovel_test_utils:await_no_shovel(Config, ?PARAM). - -local_to_local_credit_flow_on_confirm(Config) -> - local_to_local_credit_flow(Config, <<"on-confirm">>). - -local_to_local_credit_flow_on_publish(Config) -> - local_to_local_credit_flow(Config, <<"on-publish">>). - -local_to_local_credit_flow_no_ack(Config) -> - local_to_local_credit_flow(Config, <<"no-ack">>). - -local_to_local_credit_flow(Config, AckMode) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"ack-mode">>, AckMode} - ]), - amqp10_publish(Sess, Src, <<"tag1">>, 1000), - amqp10_expect_count(Sess, Dest, 1000) - end). - -local_to_local_quorum_credit_flow_on_confirm(Config) -> - local_to_local_quorum_credit_flow(Config, <<"on-confirm">>). - -local_to_local_quorum_credit_flow_on_publish(Config) -> - local_to_local_quorum_credit_flow(Config, <<"on-publish">>). - -local_to_local_quorum_credit_flow_no_ack(Config) -> - local_to_local_quorum_credit_flow(Config, <<"no-ack">>). - -local_to_local_quorum_credit_flow(Config, AckMode) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-predeclared">>, true}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest}, - {<<"dest-predeclared">>, true}, - {<<"ack-mode">>, AckMode} - ]), - amqp10_publish(Sess, Src, <<"tag1">>, 1000), - amqp10_expect_count(Sess, Dest, 1000) - end). - -local_to_local_stream_credit_flow_on_confirm(Config) -> - local_to_local_stream_credit_flow(Config, <<"on-confirm">>). - -local_to_local_stream_credit_flow_on_publish(Config) -> - local_to_local_stream_credit_flow(Config, <<"on-publish">>). - local_to_local_stream_credit_flow_no_ack(Config) -> local_to_local_stream_credit_flow(Config, <<"no-ack">>). diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl index 2d007476449..15226936350 100644 --- a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -18,11 +18,20 @@ -import(shovel_test_utils, [await_autodelete/2, set_param/3, set_param_nowait/3, + clear_param/2, with_amqp10_session/2, amqp10_publish_expect/5, amqp10_declare_queue/3, amqp10_publish/4, - amqp10_expect_count/3 + amqp10_publish_msg/4, + amqp10_expect/3, + amqp10_expect_one/2, + amqp10_expect_count/3, + amqp10_expect_empty/2, + amqp10_subscribe/2, + make_uri/2, make_uri/3, + make_uri/5, + await_no_shovel/2 ]). -define(PARAM, <<"test">>). @@ -62,6 +71,21 @@ tests() -> simple_quorum_no_ack, simple_quorum_on_confirm, simple_quorum_on_publish, + simple_stream_on_confirm, + simple_stream_on_publish, + %% Credit flow tests are just simple tests that publish a high + %% number of messages, on the attempt to trigger the different + %% credit flow mechanisms. Having the same test twice (simple/credit) + %% helps to isolate the problem. + credit_flow_classic_no_ack, + credit_flow_classic_on_confirm, + credit_flow_classic_on_publish, + credit_flow_quorum_no_ack, + credit_flow_quorum_on_confirm, + credit_flow_quorum_on_publish, + credit_flow_stream_on_confirm, + credit_flow_stream_on_publish, + delete_after_never, autodelete_classic_on_confirm, autodelete_quorum_on_confirm, autodelete_classic_on_publish, @@ -78,7 +102,13 @@ tests() -> %% autodelete_classic_on_confirm_with_rejections, %% autodelete_quorum_on_confirm_with_rejections, autodelete_classic_on_publish_with_rejections, - autodelete_quorum_on_publish_with_rejections + autodelete_quorum_on_publish_with_rejections, + no_vhost_access, + no_user_access, + application_properties, + delete_src_queue, + shovel_status, + change_definition ]. %% ------------------------------------------------------------------- @@ -94,7 +124,8 @@ init_per_suite(Config0) -> "server_initiated_close,404", "writer,send_failed,closed", "source_queue_down", - "dest_queue_down" + "dest_queue_down", + "inbound_link_detached" ]} ]), rabbit_ct_helpers:run_setup_steps( @@ -196,18 +227,21 @@ end_per_group(_, Config) -> init_per_testcase(Testcase, Config0) -> SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"), DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"), + VHost = list_to_binary(atom_to_list(Testcase) ++ "_vhost"), ShovelArgs = [{<<"src-protocol">>, ?config(src_protocol, Config0)}, {<<"dest-protocol">>, ?config(dest_protocol, Config0)}, {?config(src_address, Config0), SrcQ}, {?config(dest_address, Config0), DestQ}], Config = rabbit_ct_helpers:set_config( Config0, - [{srcq, SrcQ}, {destq, DestQ}, {shovel_args, ShovelArgs}]), + [{srcq, SrcQ}, {destq, DestQ}, {shovel_args, ShovelArgs}, + {alt_vhost, VHost}]), rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> shovel_test_utils:clear_param(Config, ?PARAM), rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []), + _ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)), rabbit_ct_helpers:testcase_finished(Config, Testcase). %% ------------------------------------------------------------------- @@ -227,24 +261,73 @@ simple(Config) -> end). simple_classic_no_ack(Config) -> - simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>). + simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>, 10). simple_classic_on_confirm(Config) -> - simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>). + simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>, 10). simple_classic_on_publish(Config) -> - simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>). + simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>, 10). simple_quorum_no_ack(Config) -> - simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>). + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>, 10). simple_quorum_on_confirm(Config) -> - simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>). + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>, 10). simple_quorum_on_publish(Config) -> - simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>). + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>, 10). -simple_queue_type_ack_mode(Config, Type, AckMode) -> +simple_stream_on_confirm(Config) -> + simple_stream(Config, <<"on-confirm">>, 10). + +simple_stream_on_publish(Config) -> + simple_stream(Config, <<"on-publish">>, 10). + +simple_stream(Config, AckMode, NMsgs) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, <<"stream">>}}), + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, <<"stream">>}}), + set_param(Config, ?PARAM, + ?config(shovel_args, Config) ++ [{<<"ack-mode">>, AckMode}]), + Receiver = amqp10_subscribe(Sess, Dest), + amqp10_publish(Sess, Src, <<"tag1">>, NMsgs), + ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := NMsgs}, _}], + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_shovel_status, status, []), + 30000), + _ = amqp10_expect(Receiver, NMsgs, []), + amqp10_client:detach_link(Receiver) + end). + +credit_flow_classic_no_ack(Config) -> + simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>, 5000). + +credit_flow_classic_on_confirm(Config) -> + simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>, 5000). + +credit_flow_classic_on_publish(Config) -> + simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>, 5000). + +credit_flow_quorum_no_ack(Config) -> + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>, 5000). + +credit_flow_quorum_on_confirm(Config) -> + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>, 5000). + +credit_flow_quorum_on_publish(Config) -> + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>, 5000). + +credit_flow_stream_on_confirm(Config) -> + simple_stream(Config, <<"on-confirm">>, 5000). + +credit_flow_stream_on_publish(Config) -> + simple_stream(Config, <<"on-publish">>, 5000). + +simple_queue_type_ack_mode(Config, Type, AckMode, NMsgs) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), with_amqp10_session( @@ -255,7 +338,23 @@ simple_queue_type_ack_mode(Config, Type, AckMode) -> ExtraArgs = [{<<"ack-mode">>, AckMode}], ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, set_param(Config, ?PARAM, ShovelArgs), - amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 10) + amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, NMsgs) + end). + +delete_after_never(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + set_param(Config, ?PARAM, + ?config(shovel_args, Config) ++ + [{<<"src-delete-after">>, <<"never">>}]), + amqp10_publish_expect(Sess, Src, Dest, <<"carrots">>, 5000), + ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 5000}, _}], + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_shovel_status, status, []), + 30000) end). autodelete_classic_on_confirm_no_transfer(Config) -> @@ -286,7 +385,7 @@ autodelete_no_ack(Config) -> ExtraArgs = [{<<"ack-mode">>, <<"no-ack">>}, {<<"src-delete-after">>, 100}], ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, - Uri = shovel_test_utils:make_uri(Config, 0), + Uri = make_uri(Config, 0), ?assertMatch({error_string, _}, rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_runtime_parameters, set, @@ -383,6 +482,113 @@ autodelete_with_quorum_rejections(Config, AckMode, ExpSrcFun) -> amqp10_expect_count(Sess, Dest, ExpDest) end). +no_vhost_access(Config) -> + AltVHost = ?config(alt_vhost, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost), + Uri = make_uri(Config, 0, AltVHost), + ExtraArgs = [{<<"src-uri">>, Uri}, {<<"dest-uri">>, [Uri]}], + ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_runtime_parameters, set, + [<<"/">>, <<"shovel">>, ?PARAM, ShovelArgs, none]), + await_no_shovel(Config, ?PARAM). + +no_user_access(Config) -> + Uri = make_uri( + Config, 0, <<"guest">>, <<"forgotmypassword">>, <<"%2F">>), + ShovelArgs = [{<<"src-uri">>, Uri}, + {<<"dest-uri">>, [Uri]}] ++ ?config(shovel_args, Config), + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_runtime_parameters, set, + [<<"/">>, <<"shovel">>, ?PARAM, ShovelArgs, none]), + await_no_shovel(Config, ?PARAM). + +application_properties(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + set_param(Config, ?PARAM, ?config(shovel_args, Config)), + Tag = <<"tag1">>, + Msg = amqp10_msg:set_application_properties( + #{<<"key">> => <<"value">>}, + amqp10_msg:set_headers( + #{durable => true}, + amqp10_msg:new(Tag, <<"hello">>, false))), + amqp10_publish_msg(Sess, Src, Tag, Msg), + MsgRcv = amqp10_expect_one(Sess, Dest), + AppProps = amqp10_msg:application_properties(MsgRcv), + ?assertMatch(#{<<"key">> := <<"value">>}, + AppProps) + end). + +delete_src_queue(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + set_param(Config, ?PARAM, ?config(shovel_args, Config)), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), + ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}], + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_shovel_status, status, []), + 30000), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queue, + [Src, <<"/">>]), + ?awaitMatch( + [[Dest, _], + [Src, _]], + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages", "--no-table-headers"])), + 45_000), + ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 0}, _}], + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_shovel_status, status, []), + 30000), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) + end). + +shovel_status(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + SrcProtocol = ?config(src_protocol, Config), + DestProtocol = ?config(dest_protocol, Config), + set_param(Config, ?PARAM, ?config(shovel_args, Config)), + Status = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_shovel_status, status, []), + ?assertMatch([{_, dynamic, {running, _}, _, _}], Status), + [{_, dynamic, {running, Info}, _, _}] = Status, + ?assertMatch(SrcProtocol, proplists:get_value(src_protocol, Info)), + ?assertMatch(DestProtocol, proplists:get_value(dest_protocol, Info)), + SrcAddress = binary_to_atom(binary:replace(?config(src_address, Config), <<"-">>, <<"_">>)), + DestAddress = binary_to_atom(binary:replace(?config(dest_address, Config), <<"-">>, <<"_">>)), + ?assertMatch(Src, proplists:get_value(SrcAddress, Info)), + ?assertMatch(Dest, proplists:get_value(DestAddress, Info)), + ok. + +change_definition(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + Dest2 = <>/binary>>, + DestAddress = ?config(dest_address, Config), + with_amqp10_session(Config, + fun (Sess) -> + ShovelArgs = ?config(shovel_args, Config), + set_param(Config, ?PARAM, ShovelArgs), + amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), + ShovelArgs0 = proplists:delete(DestAddress, ShovelArgs), + ShovelArgs2 = [{DestAddress, Dest2} | ShovelArgs0], + set_param(Config, ?PARAM, ShovelArgs2), + amqp10_publish_expect(Sess, Src, Dest2, <<"hello">>, 1), + amqp10_expect_empty(Sess, Dest), + clear_param(Config, ?PARAM), + amqp10_publish_expect(Sess, Src, Src, <<"hello">>, 1), + amqp10_expect_empty(Sess, Dest), + amqp10_expect_empty(Sess, Dest2) + end). + %%---------------------------------------------------------------------------- maybe_skip_local_protocol(Config) -> [Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -402,3 +608,12 @@ list_queue_messages(Config, QName) -> Q == QName end, List), binary_to_integer(Messages). + +delete_queue(Name, VHost) -> + QName = rabbit_misc:r(VHost, queue, Name), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>); + _ -> + ok + end. diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index 4d5bace3157..9b69c666181 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -7,7 +7,9 @@ -module(shovel_test_utils). +-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("common_test/include/ct.hrl"). + -export([set_param/3, set_param/4, set_param/5, set_param_nowait/3, await_shovel/2, await_shovel/3, await_shovel/4, await_shovel1/3, shovels_from_status/0, shovels_from_status/1, @@ -24,9 +26,13 @@ amqp10_expect_count/3, amqp10_expect/3, amqp10_publish_expect/5, amqp10_declare_queue/3, amqp10_subscribe/2, amqp10_expect/2, + amqp10_publish_msg/4, await_autodelete/2, await_autodelete1/2, invalid_param/2, invalid_param/3, - valid_param/2, valid_param/3, valid_param1/3]). + valid_param/2, valid_param/3, valid_param1/3, + with_amqp091_ch/2, amqp091_publish_expect/5, + amqp091_publish/4, amqp091_expect_empty/2, + amqp091_expect/3]). make_uri(Config, Node) -> Hostname = ?config(rmq_hostname, Config), @@ -111,7 +117,7 @@ await_credit(Sender) -> receive {amqp10_event, {link, Sender, credited}} -> ok - after 5_000 -> + after 15_000 -> flush("await_credit timed out"), ct:fail(credited_timeout) end. @@ -119,7 +125,7 @@ await_credit(Sender) -> await_amqp10_event(On, Ref, Evt) -> receive {amqp10_event, {On, Ref, Evt}} -> ok - after 5_000 -> + after 15_000 -> exit({amqp10_event_timeout, On, Ref, Evt}) end. @@ -209,10 +215,13 @@ amqp10_publish(Sender, Tag, Payload) when is_binary(Payload) -> Headers = #{durable => true}, Msg = amqp10_msg:set_headers(Headers, amqp10_msg:new(Tag, Payload, false)), + amqp10_publish_msg(Sender, Tag, Msg). + +amqp10_publish_msg(Sender, Tag, Msg) -> ok = amqp10_client:send_msg(Sender, Msg), receive {amqp10_disposition, {accepted, Tag}} -> ok - after 3000 -> + after 15000 -> exit(publish_disposition_not_received) end. @@ -230,6 +239,15 @@ amqp10_expect_empty(Session, Dest) -> end, amqp10_client:detach_link(Receiver). +amqp10_publish_msg(Session, Address, Tag, Msg) -> + LinkName = <<"dynamic-sender-", Address/binary>>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Address, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + ok = await_credit(Sender), + amqp10_publish_msg(Sender, Tag, Msg), + amqp10_client:detach_link(Sender). + amqp10_publish(Session, Address, Payload, Count) -> LinkName = <<"dynamic-sender-", Address/binary>>, {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Address, @@ -265,7 +283,7 @@ amqp10_expect(Receiver, N, Acc) -> receive {amqp10_msg, Receiver, InMsg} -> amqp10_expect(Receiver, N - 1, [InMsg | Acc]) - after 4000 -> + after 15000 -> throw({timeout_in_expect_waiting_for_delivery, N, Acc}) end. @@ -273,7 +291,7 @@ amqp10_expect(Receiver) -> receive {amqp10_msg, Receiver, InMsg} -> InMsg - after 4000 -> + after 15000 -> throw(timeout_in_expect_waiting_for_delivery) end. @@ -339,3 +357,38 @@ valid_param1(_Config, Value, User) -> invalid_param(Config, Value) -> invalid_param(Config, Value, none). valid_param(Config, Value) -> valid_param(Config, Value, none). + +with_amqp091_ch(Config, Fun) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Fun(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), + ok. + +amqp091_publish(Ch, X, Key, Payload) when is_binary(Payload) -> + amqp091_publish(Ch, X, Key, #amqp_msg{payload = Payload}); + +amqp091_publish(Ch, X, Key, Msg = #amqp_msg{}) -> + amqp_channel:cast(Ch, #'basic.publish'{exchange = X, + routing_key = Key}, Msg). + +amqp091_publish_expect(Ch, X, Key, Q, Payload) -> + amqp091_publish(Ch, X, Key, Payload), + amqp091_expect(Ch, Q, Payload). + +amqp091_expect(Ch, Q, Payload) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = true}, self()), + CTag = receive + #'basic.consume_ok'{consumer_tag = CT} -> CT + end, + Msg = receive + {#'basic.deliver'{}, #amqp_msg{payload = Payload} = M} -> + M + after 15000 -> + exit({not_received, Payload}) + end, + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + Msg. + +amqp091_expect_empty(Ch, Q) -> + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{ queue = Q }). From 402e5b7ad83ff5f1506651db2037bf5ebc28d13f Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 23 Oct 2025 17:37:09 +0200 Subject: [PATCH 2/4] Shovel AMQP091 bugfix: basic.cancel is a source message It is the source side of the shovel that must handle the basic.cancel message from the source queue. This fix makes 091-10 and 091-local work, handling the basic.cancel and restarting the shovel when required. --- deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 27f689923e5..cc7527cc991 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -229,6 +229,10 @@ handle_source({#'basic.deliver'{delivery_tag = Tag, % forward to destination rabbit_shovel_behaviour:forward(Tag, Msg, State); +handle_source(#'basic.cancel'{}, #{name := Name}) -> + ?LOG_WARNING("Shovel ~tp received a 'basic.cancel' from the server", [Name]), + {stop, {shutdown, restart}}; + handle_source({'EXIT', Conn, Reason}, #{source := #{current := {Conn, _, _}}}) -> {stop, {inbound_conn_died, Reason}}; @@ -251,10 +255,6 @@ handle_dest(#'basic.nack'{delivery_tag = Seq, multiple = Multiple}, rabbit_shovel_behaviour:nack(Tag, Multi, StateX) end, Seq, Multiple, State); -handle_dest(#'basic.cancel'{}, #{name := Name}) -> - ?LOG_WARNING("Shovel ~tp received a 'basic.cancel' from the server", [Name]), - {stop, {shutdown, restart}}; - handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) -> {stop, {outbound_conn_died, Reason}}; From 85d0e472170c7a1646b274400bda409dd95a5451 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 27 Oct 2025 08:40:00 +0100 Subject: [PATCH 3/4] Shovel bugfix: ensure initial metrics are reported for all protocols AMQP1.0 doesn't emit metrics right after the initial connection, it needs something to happen in the shovel to emit the first blocked status report. By emitting metrics on the first running report, all shovels will report the initial metrics as soon as they are initialised. --- .../src/rabbit_shovel_status.erl | 21 ++++++++++++++++++- .../src/rabbit_shovel_worker.erl | 4 +++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl index 92dcc5bbe75..55c0920e8c1 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl @@ -14,7 +14,7 @@ -export([start_link/0]). --export([report/3, +-export([report/3, report/4, report_blocked_status/2, remove/1, status/0, @@ -78,6 +78,12 @@ start_link() -> report(Name, Type, Info) -> gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}). +-spec report(name(), type(), info(), metrics()) -> ok. +report(Name, Type, Info, Metrics) -> + %% Initialise metrics for protocols that don't immediately generate a + %% blocked status report. This happens with AMQP 1.0 + gen_server:cast(?SERVER, {report, Name, Type, Info, Metrics, calendar:local_time()}). + -spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok. report_blocked_status(Name, Status) -> gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}). @@ -164,6 +170,19 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) -> split_name(Name) ++ split_status(Info)), {noreply, State}; +handle_cast({report, Name, Type, Info, Metrics, Timestamp}, State) -> + Entry = #entry{ + name = Name, + type = Type, + info = Info, + metrics = Metrics, + timestamp = Timestamp + }, + true = ets:insert(?ETS_NAME, Entry), + rabbit_event:notify(shovel_worker_status, + split_name(Name) ++ split_status(Info)), + {noreply, State}; + handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) -> case Status of flow -> diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl index f2ff7ffbea0..cd69e4076ff 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl @@ -279,13 +279,15 @@ report_running(#state{config = Config} = State) -> OutProto = rabbit_shovel_behaviour:dest_protocol(Config), InEndpoint = rabbit_shovel_behaviour:source_endpoint(Config), OutEndpoint = rabbit_shovel_behaviour:dest_endpoint(Config), + {_, Metrics} = rabbit_shovel_behaviour:status(Config), rabbit_shovel_status:report(State#state.name, State#state.type, {running, [{src_uri, rabbit_data_coercion:to_binary(InUri)}, {src_protocol, rabbit_data_coercion:to_binary(InProto)}, {dest_protocol, rabbit_data_coercion:to_binary(OutProto)}, {dest_uri, rabbit_data_coercion:to_binary(OutUri)}] ++ props_to_binary(InEndpoint) ++ props_to_binary(OutEndpoint) - }). + }, + Metrics). props_to_binary(Props) -> [{K, rabbit_data_coercion:to_binary(V)} || {K, V} <- Props]. From ef157e537fa0f31359b1110e31a4d73fa14d373e Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 27 Oct 2025 18:53:11 +0100 Subject: [PATCH 4/4] Shovel local bugfix: skip src_queue in endpoint info during exchange routing --- deps/rabbitmq_shovel/src/rabbit_local_shovel.erl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 7310795ac9c..d8dd320b43c 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -244,12 +244,10 @@ source_protocol(_State) -> dest_protocol(_State) -> local. -source_endpoint(#{source := #{queue := Queue, - exchange := SrcX, +source_endpoint(#{source := #{exchange := SrcX, routing_key := SrcXKey}}) -> [{src_exchange, SrcX}, - {src_exchange_key, SrcXKey}, - {src_queue, Queue}]; + {src_exchange_key, SrcXKey}]; source_endpoint(#{source := #{queue := Queue}}) -> [{src_queue, Queue}]; source_endpoint(_Config) ->