Skip to content

Commit a120eae

Browse files
committed
Add queue type setting for exchange federation
Remove x-ha-policy (ignored since RabbitMQ 3.0)
1 parent 3a6a2f2 commit a120eae

File tree

8 files changed

+67
-16
lines changed

8 files changed

+67
-16
lines changed

deps/rabbitmq_federation/include/rabbit_federation.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
message_ttl,
1717
trust_user_id,
1818
ack_mode,
19-
ha_policy,
19+
queue_type,
2020
name,
2121
bind_nowait,
2222
resource_cleanup_mode,
@@ -45,4 +45,4 @@
4545

4646
-define(FEDERATION_GUIDE_URL, <<"https://rabbitmq.com/federation.html">>).
4747

48-
-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).
48+
-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).

deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -504,15 +504,16 @@ consume_from_upstream_queue(
504504
#upstream{prefetch_count = Prefetch,
505505
expires = Expiry,
506506
message_ttl = TTL,
507-
ha_policy = HA} = Upstream,
507+
queue_type = QueueType} = Upstream,
508508
#upstream_params{x_or_q = X,
509509
params = Params} = UParams,
510510
Q = upstream_queue_name(name(X), vhost(Params), DownXName),
511511
Args = [A || {_K, _T, V} = A
512512
<- [{<<"x-expires">>, long, Expiry},
513513
{<<"x-message-ttl">>, long, TTL},
514-
{<<"x-ha-policy">>, longstr, HA},
515-
{<<"x-internal-purpose">>, longstr, <<"federation">>}],
514+
{<<"x-internal-purpose">>, longstr, <<"federation">>},
515+
{<<"x-queue-type">>, longstr, atom_to_binary(QueueType)}
516+
],
516517
V =/= none],
517518
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
518519
durable = true,

deps/rabbitmq_federation/src/rabbit_federation_parameters.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ shared_validation() ->
8989
['no-ack', 'on-publish', 'on-confirm']), optional},
9090
{<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(
9191
['default', 'never']), optional},
92-
{<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional},
92+
{<<"queue-type">>, rabbit_parameter_validation:enum(
93+
['classic', 'quorum']), optional},
9394
{<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional},
9495
{<<"channel-use-mode">>, rabbit_parameter_validation:enum(
9596
['multiple', 'single']), optional}].

deps/rabbitmq_federation/src/rabbit_federation_upstream.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ from_upstream_or_set(US, Name, U, XorQ) ->
136136
message_ttl = bget('message-ttl', US, U, none),
137137
trust_user_id = bget('trust-user-id', US, U, false),
138138
ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)),
139-
ha_policy = bget('ha-policy', US, U, none),
139+
queue_type = to_atom(bget('queue-type', US, U, <<"classic">>)),
140140
name = Name,
141141
bind_nowait = bget('bind-nowait', US, U, false),
142142
resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>)),

deps/rabbitmq_federation/test/exchange_SUITE.erl

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ groups() ->
5050
essential() ->
5151
[
5252
single_upstream,
53+
single_upstream_quorum,
5354
multiple_upstreams,
5455
multiple_upstreams_pattern,
5556
single_upstream_multiple_uris,
@@ -163,9 +164,46 @@ single_upstream(Config) ->
163164
await_binding(Config, 0, UpX, RK),
164165
publish_expect(Ch, UpX, RK, Q, <<"single_upstream payload">>),
165166

167+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
168+
assert_federation_internal_queue_type(Config, Server, rabbit_classic_queue),
169+
166170
rabbit_ct_client_helpers:close_channel(Ch),
167171
clean_up_federation_related_bits(Config).
168172

173+
single_upstream_quorum(Config) ->
174+
FedX = <<"single_upstream_quorum.federated">>,
175+
UpX = <<"single_upstream_quorum.upstream.x">>,
176+
rabbit_ct_broker_helpers:set_parameter(
177+
Config, 0, <<"federation-upstream">>, <<"localhost">>,
178+
[
179+
{<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)},
180+
{<<"exchange">>, UpX},
181+
{<<"queue-type">>, <<"quorum">>}
182+
]),
183+
rabbit_ct_broker_helpers:set_policy(
184+
Config, 0,
185+
<<"fed.x">>, <<"^single_upstream_quorum.federated">>, <<"exchanges">>,
186+
[
187+
{<<"federation-upstream">>, <<"localhost">>}
188+
]),
189+
190+
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
191+
192+
Xs = [
193+
exchange_declare_method(FedX)
194+
],
195+
declare_exchanges(Ch, Xs),
196+
197+
RK = <<"key">>,
198+
Q = declare_and_bind_queue(Ch, FedX, RK),
199+
await_binding(Config, 0, UpX, RK),
200+
publish_expect(Ch, UpX, RK, Q, <<"single_upstream_quorum payload">>),
201+
202+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
203+
assert_federation_internal_queue_type(Config, Server, rabbit_quorum_queue),
204+
205+
rabbit_ct_client_helpers:close_channel(Ch),
206+
clean_up_federation_related_bits(Config).
169207

170208
multiple_upstreams(Config) ->
171209
FedX = <<"multiple_upstreams.federated">>,
@@ -870,3 +908,14 @@ await_credentials_obfuscation_seeding_on_two_nodes(Config) ->
870908
end),
871909

872910
timer:sleep(1000).
911+
912+
assert_federation_internal_queue_type(Config, Server, Expected) ->
913+
Qs = all_queues_on(Config, Server),
914+
FedQs = lists:filter(
915+
fun(Q) ->
916+
lists:member(
917+
{<<"x-internal-purpose">>, longstr, <<"federation">>}, amqqueue:get_arguments(Q))
918+
end,
919+
Qs),
920+
FedQTypes = lists:map(fun(Q) -> amqqueue:get_type(Q) end, FedQs),
921+
?assertEqual([Expected], lists:uniq(FedQTypes)).

deps/rabbitmq_federation_management/priv/www/js/federation.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ HELP['federation-expires'] =
7575
HELP['federation-ttl'] =
7676
'Time in milliseconds that undelivered messages should be held upstream when there is a network outage or backlog. Leave this blank to mean "forever".';
7777

78-
HELP['ha-policy'] =
79-
'Determines the "x-ha-policy" argument for the upstream queue for a federated exchange. Default is "none", meaning the queue is not HA.';
78+
HELP['queue-type'] =
79+
'Defines the queue type for the upstream queue for a federated exchange. Default is "classic". Set to "quorum" for high availability.';
8080

8181
HELP['queue'] =
8282
'The name of the upstream queue. Default is to use the same name as the federated queue.';

deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstream.ejs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@
5656
</tr>
5757

5858
<tr>
59-
<th>HA Policy</th>
60-
<td><%= fmt_string(upstream.value['ha-policy']) %></td>
59+
<th>Queue Type</th>
60+
<td><%= fmt_string(upstream.value['queue-type']) %></td>
6161
</tr>
6262

6363
<tr>

deps/rabbitmq_federation_management/priv/www/js/tmpl/federation-upstreams.ejs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<th>Max Hops</th>
2020
<th>Expiry</th>
2121
<th>Message TTL</th>
22-
<th>HA Policy</th>
22+
<th>Queue Type</th>
2323
<th>Queue</th>
2424
<th>Consumer tag</th>
2525
</tr>
@@ -43,7 +43,7 @@
4343
<td class="r"><%= upstream.value['max-hops'] %></td>
4444
<td class="r"><%= fmt_time(upstream.value.expires, 'ms') %></td>
4545
<td class="r"><%= fmt_time(upstream.value['message-ttl'], 'ms') %></td>
46-
<td class="r"><%= fmt_string(upstream.value['ha-policy']) %></td>
46+
<td class="r"><%= fmt_string(upstream.value['queue-type']) %></td>
4747
<td class="r"><%= fmt_string(upstream.value['queue']) %></td>
4848
<td class="r"><%= fmt_string(upstream.value['consumer-tag']) %></td>
4949
</tr>
@@ -195,11 +195,11 @@
195195
<tr>
196196
<th>
197197
<label>
198-
HA Policy:
199-
<span class="help" id="ha-policy"></span>
198+
Queue Type:
199+
<span class="help" id="queue-type"></span>
200200
</label>
201201
</th>
202-
<td><input type="text" name="ha-policy"/></td>
202+
<td><input type="text" name="queue-type"/></td>
203203
</tr>
204204
</tr>
205205

0 commit comments

Comments
 (0)