Skip to content
This repository was archived by the owner on Nov 17, 2020. It is now read-only.

Commit 2541a23

Browse files
Implement federation-upstream-regex upstream parameter
1 parent 5083ed7 commit 2541a23

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

src/rabbit_federation_parameters.erl

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
{runtime_parameter, <<"federation-upstream">>},
2929
{runtime_parameter, <<"federation-upstream-set">>},
3030
{policy_validator, <<"federation-upstream">>},
31+
{policy_validator, <<"federation-upstream-regex">>},
3132
{policy_validator, <<"federation-upstream-set">>}]).
3233

3334
-rabbit_boot_step({?MODULE,
@@ -123,13 +124,22 @@ validate_policy([{<<"federation-upstream-set">>, Value}])
123124
validate_policy([{<<"federation-upstream-set">>, Value}]) ->
124125
{error, "~p is not a valid federation upstream set name", [Value]};
125126

127+
validate_policy([{<<"federation-upstream-regex">>, Value}])
128+
when is_binary(Value) ->
129+
case re:compile(Value) of
130+
{ok, _} -> ok;
131+
{error, Reason} -> {error, "~s should be regular expression "
132+
"but is invalid: ~p", [Value, Reason]}
133+
end;
134+
validate_policy([{<<"federation-upstream-regex">>, Value}]) ->
135+
{error, "~p is not a valid federation upstream re name", [Value]};
136+
126137
validate_policy([{<<"federation-upstream">>, Value}])
127138
when is_binary(Value) ->
128139
ok;
129140
validate_policy([{<<"federation-upstream">>, Value}]) ->
130141
{error, "~p is not a valid federation upstream name", [Value]};
131142

132-
validate_policy(L) when length(L) =:= 2 ->
143+
validate_policy(L) when length(L) >= 2 ->
133144
{error, "cannot specify federation-upstream and federation-upstream-set "
134-
"together", []}.
135-
145+
"or federation-upstream-regex together", []}.

src/rabbit_federation_upstream.erl

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
-export([federate/1, for/1, for/2, params_to_string/1, to_params/2]).
2424
%% For testing
25-
-export([from_set/2, remove_credentials/1]).
25+
-export([from_set/2, from_re/2, remove_credentials/1]).
2626

2727
-import(rabbit_misc, [pget/2, pget/3]).
2828
-import(rabbit_federation_util, [name/1, vhost/1, r/1]).
@@ -31,7 +31,8 @@
3131

3232
federate(XorQ) ->
3333
rabbit_policy:get(<<"federation-upstream">>, XorQ) =/= undefined orelse
34-
rabbit_policy:get(<<"federation-upstream-set">>, XorQ) =/= undefined.
34+
rabbit_policy:get(<<"federation-upstream-set">>, XorQ) =/= undefined orelse
35+
rabbit_policy:get(<<"federation-upstream-regex">>, XorQ) =/= undefined.
3536

3637
for(XorQ) ->
3738
case federate(XorQ) of
@@ -49,11 +50,13 @@ for(XorQ, UpstreamName) ->
4950
upstreams(XorQ) ->
5051
UName = rabbit_policy:get(<<"federation-upstream">>, XorQ),
5152
USetName = rabbit_policy:get(<<"federation-upstream-set">>, XorQ),
52-
%% Cannot define both, see rabbit_federation_parameters:validate_policy/1
53-
case {UName, USetName} of
54-
{undefined, undefined} -> [];
55-
{undefined, _} -> set_contents(USetName, vhost(XorQ));
56-
{_, undefined} -> [[{<<"upstream">>, UName}]]
53+
UReValue = rabbit_policy:get(<<"federation-upstream-regex">>, XorQ),
54+
%% Cannot define 2 at a time, see rabbit_federation_parameters:validate_policy/1
55+
case {UName, USetName, UReValue} of
56+
{undefined, undefined, undefined} -> [];
57+
{undefined, undefined, _} -> find_contents(UReValue, vhost(XorQ));
58+
{undefined, _, undefined} -> set_contents(USetName, vhost(XorQ));
59+
{_, undefined, undefined} -> [[{<<"upstream">>, UName}]]
5760
end.
5861

5962
params_table(SafeURI, XorQ) ->
@@ -87,6 +90,9 @@ print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
8790
from_set(SetName, XorQ) ->
8891
from_set_contents(set_contents(SetName, vhost(XorQ)), XorQ).
8992

93+
from_re(SetName, XorQ) ->
94+
from_set_contents(find_contents(SetName, vhost(XorQ)), XorQ).
95+
9096
set_contents(<<"all">>, VHost) ->
9197
Upstreams0 = rabbit_runtime_parameters:list(
9298
VHost, <<"federation-upstream">>),
@@ -100,6 +106,13 @@ set_contents(SetName, VHost) ->
100106
Set -> Set
101107
end.
102108

109+
find_contents(RegExp, VHost) ->
110+
Upstreams0 = rabbit_runtime_parameters:list(
111+
VHost, <<"federation-upstream">>),
112+
Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0,
113+
re:run(pget(name, U), RegExp) =/= nomatch],
114+
[[{<<"upstream">>, pget(name, U)}] || U <- Upstreams].
115+
103116
from_set_contents(Set, XorQ) ->
104117
Results = [from_set_element(P, XorQ) || P <- Set],
105118
[R || R <- Results, R =/= not_found].

0 commit comments

Comments
 (0)