From d5c667815bb02686c5c7ded26cd7a17ecf0966be Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 29 Feb 2024 12:31:40 +0100 Subject: [PATCH] Add queue type setting for exchange federation Remove x-ha-policy (ignored since RabbitMQ 3.0) (cherry picked from commit 2e8ff3aaee6418473cc5d869f470d229701bc267) --- .../include/rabbit_federation.hrl | 4 +- .../src/rabbit_federation_exchange_link.erl | 7 +-- .../src/rabbit_federation_parameters.erl | 3 +- .../src/rabbit_federation_upstream.erl | 2 +- .../test/exchange_SUITE.erl | 49 +++++++++++++++++++ .../priv/www/js/federation.js | 4 +- .../priv/www/js/tmpl/federation-upstream.ejs | 4 +- .../priv/www/js/tmpl/federation-upstreams.ejs | 10 ++-- 8 files changed, 67 insertions(+), 16 deletions(-) diff --git a/deps/rabbitmq_federation/include/rabbit_federation.hrl b/deps/rabbitmq_federation/include/rabbit_federation.hrl index d3bfc9402148..991dec1ed9d5 100644 --- a/deps/rabbitmq_federation/include/rabbit_federation.hrl +++ b/deps/rabbitmq_federation/include/rabbit_federation.hrl @@ -16,7 +16,7 @@ message_ttl, trust_user_id, ack_mode, - ha_policy, + queue_type, name, bind_nowait, resource_cleanup_mode, @@ -45,4 +45,4 @@ -define(FEDERATION_GUIDE_URL, <<"https://rabbitmq.com/federation.html">>). --define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope). \ No newline at end of file +-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl index c8f58e41ec3b..62920712ec38 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl @@ -504,15 +504,16 @@ consume_from_upstream_queue( #upstream{prefetch_count = Prefetch, expires = Expiry, message_ttl = TTL, - ha_policy = HA} = Upstream, + queue_type = QueueType} = Upstream, #upstream_params{x_or_q = X, params = Params} = UParams, Q = upstream_queue_name(name(X), vhost(Params), DownXName), Args = [A || {_K, _T, V} = A <- [{<<"x-expires">>, long, Expiry}, {<<"x-message-ttl">>, long, TTL}, - {<<"x-ha-policy">>, longstr, HA}, - {<<"x-internal-purpose">>, longstr, <<"federation">>}], + {<<"x-internal-purpose">>, longstr, <<"federation">>}, + {<<"x-queue-type">>, longstr, atom_to_binary(QueueType)} + ], V =/= none], amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl index 4cd92554b632..ff803ec162c5 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl @@ -89,7 +89,8 @@ shared_validation() -> ['no-ack', 'on-publish', 'on-confirm']), optional}, {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum( ['default', 'never']), optional}, - {<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"queue-type">>, rabbit_parameter_validation:enum( + ['classic', 'quorum']), optional}, {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}, {<<"channel-use-mode">>, rabbit_parameter_validation:enum( ['multiple', 'single']), optional}]. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 66fab5d58a38..7e303a030856 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -136,7 +136,7 @@ from_upstream_or_set(US, Name, U, XorQ) -> message_ttl = bget('message-ttl', US, U, none), trust_user_id = bget('trust-user-id', US, U, false), ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)), - ha_policy = bget('ha-policy', US, U, none), + queue_type = to_atom(bget('queue-type', US, U, <<"classic">>)), name = Name, bind_nowait = bget('bind-nowait', US, U, false), resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>)), diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 0f920caca0d7..ffa61520123a 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -50,6 +50,7 @@ groups() -> essential() -> [ single_upstream, + single_upstream_quorum, multiple_upstreams, multiple_upstreams_pattern, single_upstream_multiple_uris, @@ -163,9 +164,46 @@ single_upstream(Config) -> await_binding(Config, 0, UpX, RK), publish_expect(Ch, UpX, RK, Q, <<"single_upstream payload">>), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + assert_federation_internal_queue_type(Config, Server, rabbit_classic_queue), + rabbit_ct_client_helpers:close_channel(Ch), clean_up_federation_related_bits(Config). +single_upstream_quorum(Config) -> + FedX = <<"single_upstream_quorum.federated">>, + UpX = <<"single_upstream_quorum.upstream.x">>, + rabbit_ct_broker_helpers:set_parameter( + Config, 0, <<"federation-upstream">>, <<"localhost">>, + [ + {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)}, + {<<"exchange">>, UpX}, + {<<"queue-type">>, <<"quorum">>} + ]), + rabbit_ct_broker_helpers:set_policy( + Config, 0, + <<"fed.x">>, <<"^single_upstream_quorum.federated">>, <<"exchanges">>, + [ + {<<"federation-upstream">>, <<"localhost">>} + ]), + + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + + Xs = [ + exchange_declare_method(FedX) + ], + declare_exchanges(Ch, Xs), + + RK = <<"key">>, + Q = declare_and_bind_queue(Ch, FedX, RK), + await_binding(Config, 0, UpX, RK), + publish_expect(Ch, UpX, RK, Q, <<"single_upstream_quorum payload">>), + + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + assert_federation_internal_queue_type(Config, Server, rabbit_quorum_queue), + + rabbit_ct_client_helpers:close_channel(Ch), + clean_up_federation_related_bits(Config). multiple_upstreams(Config) -> FedX = <<"multiple_upstreams.federated">>, @@ -870,3 +908,14 @@ await_credentials_obfuscation_seeding_on_two_nodes(Config) -> end), timer:sleep(1000). + +assert_federation_internal_queue_type(Config, Server, Expected) -> + Qs = all_queues_on(Config, Server), + FedQs = lists:filter( + fun(Q) -> + lists:member( + {<<"x-internal-purpose">>, longstr, <<"federation">>}, amqqueue:get_arguments(Q)) + end, + Qs), + FedQTypes = lists:map(fun(Q) -> amqqueue:get_type(Q) end, FedQs), + ?assertEqual([Expected], lists:uniq(FedQTypes)). diff --git a/deps/rabbitmq_federation_management/priv/www/js/federation.js b/deps/rabbitmq_federation_management/priv/www/js/federation.js index 4ea78c932dbe..e31fbae6a685 100644 --- a/deps/rabbitmq_federation_management/priv/www/js/federation.js +++ b/deps/rabbitmq_federation_management/priv/www/js/federation.js @@ -75,8 +75,8 @@ HELP['federation-expires'] = HELP['federation-ttl'] = 'Time in milliseconds that undelivered messages should be held upstream when there is a network outage or backlog. Leave this blank to mean "forever".'; -HELP['ha-policy'] = - 'Determines the "x-ha-policy" argument for the upstream queue for a federated exchange. Default is "none", meaning the queue is not HA.'; +HELP['queue-type'] = + 'Defines the queue type for the upstream queue for a federated exchange. Default is "classic". Set to "quorum" for high availability.'; HELP['queue'] = 'The name of the upstream queue. Default is to use the same name as the federated queue.'; diff --git a/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstream.ejs b/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstream.ejs index 6fad08dc931b..d6918b79fd2e 100644 --- a/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstream.ejs +++ b/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstream.ejs @@ -56,8 +56,8 @@ - HA Policy - <%= fmt_string(upstream.value['ha-policy']) %> + Queue Type + <%= fmt_string(upstream.value['queue-type']) %> diff --git a/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstreams.ejs b/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstreams.ejs index 838eac1eb3b4..3e5504671ed0 100644 --- a/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstreams.ejs +++ b/deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstreams.ejs @@ -19,7 +19,7 @@ Max Hops Expiry Message TTL - HA Policy + Queue Type Queue Consumer tag @@ -43,7 +43,7 @@ <%= upstream.value['max-hops'] %> <%= fmt_time(upstream.value.expires, 'ms') %> <%= fmt_time(upstream.value['message-ttl'], 'ms') %> - <%= fmt_string(upstream.value['ha-policy']) %> + <%= fmt_string(upstream.value['queue-type']) %> <%= fmt_string(upstream.value['queue']) %> <%= fmt_string(upstream.value['consumer-tag']) %> @@ -195,11 +195,11 @@ - +