diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index 216c6a086e16..c3a673492f5d 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -1112,10 +1112,10 @@ runtime_parameter_definition(Param) -> maybe_map(Value) -> %% Not all definitions are maps. `federation-upstream-set` is - %% a list of maps, and it should be exported as it has been - %% imported + %% a list of maps. We also need to recursively convert nested + %% proplists to maps (e.g. policy and operator policy definitions). try - rabbit_data_coercion:to_map(Value) + rabbit_data_coercion:to_map_recursive(Value) catch error:badarg -> Value diff --git a/deps/rabbit/test/definition_import_SUITE.erl b/deps/rabbit/test/definition_import_SUITE.erl index 4d5758d0c47f..a5fe785c9474 100644 --- a/deps/rabbit/test/definition_import_SUITE.erl +++ b/deps/rabbit/test/definition_import_SUITE.erl @@ -71,7 +71,12 @@ groups() -> {roundtrip, [], [ export_import_round_trip_case1, - export_import_round_trip_case2 + export_import_round_trip_case2, + export_import_round_trip_operator_policy, + export_import_round_trip_vhost_limits, + export_import_round_trip_operator_policy_all_keys, + export_import_round_trip_multiple_operator_policies, + export_import_round_trip_mixed_parameters ]}, {skip_if_unchanged, [], [ @@ -358,6 +363,105 @@ export_import_round_trip_case2(Config) -> {skip, "Should not run in mixed version environments"} end. +run_if_not_mixed_versions(Fun) -> + case rabbit_ct_helpers:is_mixed_versions() of + false -> Fun(); + _ -> {skip, "Should not run in mixed version environments"} + end. + +export_import_round_trip_operator_policy(Config) -> + run_if_not_mixed_versions(fun() -> + import_file_case(Config, "case23"), + Defs = export(Config), + Parameters = maps:get(parameters, Defs, []), + [OpPol] = [P || P <- Parameters, + maps:get(<<"component">>, P) =:= <<"operator_policy">>, + maps:get(<<"name">>, P) =:= <<"transient-queue-ttl">>], + Value = maps:get(<<"value">>, OpPol), + Definition = maps:get(<<"definition">>, Value), + ?assert(is_map(Definition)), + ?assertEqual(1800000, maps:get(<<"expires">>, Definition)), + ?assertEqual(60000, maps:get(<<"message-ttl">>, Definition)), + ?assertEqual(1000, maps:get(<<"max-length">>, Definition)), + import_parsed(Config, Defs) + end). + +export_import_round_trip_vhost_limits(Config) -> + run_if_not_mixed_versions(fun() -> + import_file_case(Config, "case6"), + Defs = export(Config), + Parameters = maps:get(parameters, Defs, []), + VHostLimits = lists:filter( + fun(P) -> + maps:get(<<"component">>, P, undefined) =:= <<"vhost-limits">> + end, + Parameters + ), + ?assertEqual(1, length(VHostLimits)), + [Limits] = VHostLimits, + Value = maps:get(<<"value">>, Limits), + ?assert(is_map(Value)), + ?assertEqual(456, maps:get(<<"max-queues">>, Value)), + ?assertEqual(123, maps:get(<<"max-connections">>, Value)), + import_parsed(Config, Defs) + end). + +export_import_round_trip_operator_policy_all_keys(Config) -> + run_if_not_mixed_versions(fun() -> + import_file_case(Config, "case24"), + Defs = export(Config), + Parameters = maps:get(parameters, Defs, []), + [OpPol] = [P || P <- Parameters, + maps:get(<<"component">>, P) =:= <<"operator_policy">>, + maps:get(<<"name">>, P) =:= <<"regulated-queues">>], + Value = maps:get(<<"value">>, OpPol), + Definition = maps:get(<<"definition">>, Value), + ?assert(is_map(Definition)), + ?assertEqual(4, maps:size(Definition)), + ?assertEqual(3600000, maps:get(<<"expires">>, Definition)), + ?assertEqual(300000, maps:get(<<"message-ttl">>, Definition)), + ?assertEqual(10000, maps:get(<<"max-length">>, Definition)), + ?assertEqual(104857600, maps:get(<<"max-length-bytes">>, Definition)), + import_parsed(Config, Defs) + end). + +export_import_round_trip_multiple_operator_policies(Config) -> + run_if_not_mixed_versions(fun() -> + import_file_case(Config, "case25"), + Defs = export(Config), + Parameters = maps:get(parameters, Defs, []), + ExpectedPolicies = [<<"transient-queues">>, <<"limited-queues">>, <<"archive-queues">>], + lists:foreach(fun(PolicyName) -> + [OpPol] = [P || P <- Parameters, + maps:get(<<"component">>, P) =:= <<"operator_policy">>, + maps:get(<<"name">>, P) =:= PolicyName], + Value = maps:get(<<"value">>, OpPol), + Definition = maps:get(<<"definition">>, Value), + ?assert(is_map(Definition)), + ?assert(maps:size(Definition) >= 2) + end, ExpectedPolicies), + import_parsed(Config, Defs) + end). + +export_import_round_trip_mixed_parameters(Config) -> + run_if_not_mixed_versions(fun() -> + import_file_case(Config, "case26"), + Defs = export(Config), + Parameters = maps:get(parameters, Defs, []), + [Limits] = [P || P <- Parameters, + maps:get(<<"component">>, P) =:= <<"vhost-limits">>, + maps:get(<<"name">>, P) =:= <<"limits">>], + LimitsValue = maps:get(<<"value">>, Limits), + ?assert(is_map(LimitsValue)), + [OpPol] = [P || P <- Parameters, + maps:get(<<"component">>, P) =:= <<"operator_policy">>, + maps:get(<<"name">>, P) =:= <<"temp-queues">>], + OpPolValue = maps:get(<<"value">>, OpPol), + OpPolDefinition = maps:get(<<"definition">>, OpPolValue), + ?assert(is_map(OpPolDefinition)), + import_parsed(Config, Defs) + end). + import_on_a_booting_node_using_classic_local_source(Config) -> %% see case5.json VHost = <<"vhost2">>, diff --git a/deps/rabbit/test/definition_import_SUITE_data/case23.json b/deps/rabbit/test/definition_import_SUITE_data/case23.json new file mode 100644 index 000000000000..df3ba55d7a0a --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/case23.json @@ -0,0 +1,48 @@ +{ + "rabbit_version": "4.2.0", + "users": [ + { + "name": "guest", + "password_hash": "J+UiUxNQ3I8uPn6Lo2obWcl93VgXgbw4R+xhl3L5zHwkRFZG", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator" + } + ], + "vhosts": [ + { + "name": "/" + } + ], + "permissions": [ + { + "user": "guest", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + } + ], + "topic_permissions": [], + "parameters": [ + { + "value": { + "pattern": "^amq\\.", + "definition": { + "expires": 1800000, + "message-ttl": 60000, + "max-length": 1000 + }, + "priority": 1, + "apply-to": "queues" + }, + "vhost": "/", + "component": "operator_policy", + "name": "transient-queue-ttl" + } + ], + "global_parameters": [], + "policies": [], + "queues": [], + "exchanges": [], + "bindings": [] +} diff --git a/deps/rabbit/test/definition_import_SUITE_data/case24.json b/deps/rabbit/test/definition_import_SUITE_data/case24.json new file mode 100644 index 000000000000..6247063efa2a --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/case24.json @@ -0,0 +1,49 @@ +{ + "rabbit_version": "4.2.0", + "users": [ + { + "name": "guest", + "password_hash": "J+UiUxNQ3I8uPn6Lo2obWcl93VgXgbw4R+xhl3L5zHwkRFZG", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator" + } + ], + "vhosts": [ + { + "name": "/" + } + ], + "permissions": [ + { + "user": "guest", + "vhost": "/", + "configure": ".*", + "read": ".*", + "write": ".*" + } + ], + "topic_permissions": [], + "parameters": [ + { + "value": { + "pattern": "^regulated\\.", + "definition": { + "expires": 3600000, + "message-ttl": 300000, + "max-length": 10000, + "max-length-bytes": 104857600 + }, + "priority": 1, + "apply-to": "queues" + }, + "vhost": "/", + "component": "operator_policy", + "name": "regulated-queues" + } + ], + "global_parameters": [], + "policies": [], + "queues": [], + "exchanges": [], + "bindings": [] +} diff --git a/deps/rabbit/test/definition_import_SUITE_data/case25.json b/deps/rabbit/test/definition_import_SUITE_data/case25.json new file mode 100644 index 000000000000..5efebf397ac2 --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/case25.json @@ -0,0 +1,77 @@ +{ + "rabbit_version": "4.2.0", + "users": [ + { + "name": "guest", + "password_hash": "J+UiUxNQ3I8uPn6Lo2obWcl93VgXgbw4R+xhl3L5zHwkRFZG", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator" + } + ], + "vhosts": [ + { + "name": "/" + } + ], + "permissions": [ + { + "user": "guest", + "vhost": "/", + "configure": ".*", + "read": ".*", + "write": ".*" + } + ], + "topic_permissions": [], + "parameters": [ + { + "value": { + "pattern": "^transient\\.", + "definition": { + "expires": 300000, + "message-ttl": 60000 + }, + "priority": 10, + "apply-to": "queues" + }, + "vhost": "/", + "component": "operator_policy", + "name": "transient-queues" + }, + { + "value": { + "pattern": "^limited\\.", + "definition": { + "max-length": 5000, + "max-length-bytes": 10485760 + }, + "priority": 5, + "apply-to": "queues" + }, + "vhost": "/", + "component": "operator_policy", + "name": "limited-queues" + }, + { + "value": { + "pattern": "^archive\\.", + "definition": { + "expires": 2592000000, + "max-length": 100000, + "message-ttl": 86400000, + "max-length-bytes": 1073741824 + }, + "priority": 1, + "apply-to": "queues" + }, + "vhost": "/", + "component": "operator_policy", + "name": "archive-queues" + } + ], + "global_parameters": [], + "policies": [], + "queues": [], + "exchanges": [], + "bindings": [] +} diff --git a/deps/rabbit/test/definition_import_SUITE_data/case26.json b/deps/rabbit/test/definition_import_SUITE_data/case26.json new file mode 100644 index 000000000000..36837248a6c0 --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/case26.json @@ -0,0 +1,56 @@ +{ + "rabbit_version": "4.2.0", + "users": [ + { + "name": "guest", + "password_hash": "J+UiUxNQ3I8uPn6Lo2obWcl93VgXgbw4R+xhl3L5zHwkRFZG", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator" + } + ], + "vhosts": [ + { + "name": "/" + } + ], + "permissions": [ + { + "user": "guest", + "vhost": "/", + "configure": ".*", + "read": ".*", + "write": ".*" + } + ], + "topic_permissions": [], + "parameters": [ + { + "value": { + "max-connections": 500, + "max-queues": 1000 + }, + "vhost": "/", + "component": "vhost-limits", + "name": "limits" + }, + { + "value": { + "pattern": "^temp\\.", + "definition": { + "expires": 1800000, + "max-length": 2000 + }, + "priority": 1, + "apply-to": "queues" + }, + "vhost": "/", + "component": "operator_policy", + "name": "temp-queues" + } + ], + "global_parameters": [], + "policies": [], + "queues": [], + "exchanges": [], + "bindings": [] +} diff --git a/deps/rabbit_common/src/rabbit_data_coercion.erl b/deps/rabbit_common/src/rabbit_data_coercion.erl index 7dbe1b5b551e..b8475dfefbf8 100644 --- a/deps/rabbit_common/src/rabbit_data_coercion.erl +++ b/deps/rabbit_common/src/rabbit_data_coercion.erl @@ -7,7 +7,7 @@ -module(rabbit_data_coercion). --export([to_binary/1, to_list/1, to_atom/1, to_integer/1, to_proplist/1, to_map/1]). +-export([to_binary/1, to_list/1, to_atom/1, to_integer/1, to_proplist/1, to_map/1, to_map_recursive/1]). -export([to_atom/2, atomize_keys/1, to_list_of_binaries/1]). -export([to_utf8_binary/1, to_unicode_charlist/1]). -export([as_list/1]). @@ -115,4 +115,48 @@ to_unicode_charlist(Val) -> as_list(Nodes) when is_list(Nodes) -> Nodes; as_list(Other) -> - [Other]. \ No newline at end of file + [Other]. + +-spec to_map_recursive(Val :: map() | list() | any()) -> map() | list() | any(). +%% Recursively convert proplists to maps. +to_map_recursive([]) -> + []; +to_map_recursive(Value) when is_list(Value) -> + case is_proplist(Value) of + true -> + %% Normalize proplist by expanding bare atoms to {Atom, true} + Normalized = normalize_proplist(Value), + M = to_map(Normalized), + maps:map(fun(_K, V) -> to_map_recursive(V) end, M); + false -> + %% The input is a regular list (e.g., federation-upstream-set) + [to_map_recursive(V) || V <- Value] + end; +to_map_recursive(Value) when is_map(Value) -> + %% Recursively process map values + maps:map(fun(_K, V) -> to_map_recursive(V) end, Value); +to_map_recursive(Value) -> + %% Base case: atomic values (binary, integer, atom, etc.) + Value. + +-spec normalize_proplist(list()) -> [{atom() | binary() | list(), any()}]. +normalize_proplist([]) -> + []; +normalize_proplist([{K, V} | Rest]) -> + [{K, V} | normalize_proplist(Rest)]; +normalize_proplist([Atom | Rest]) when is_atom(Atom) -> + [{Atom, true} | normalize_proplist(Rest)]. + +-spec is_proplist(any()) -> boolean(). +is_proplist(List) when is_list(List) -> + lists:all(fun is_proplist_element/1, List); +is_proplist(_) -> + false. + +-spec is_proplist_element(any()) -> boolean(). +is_proplist_element({Key, _Value}) when is_atom(Key); is_binary(Key); is_list(Key) -> + true; +is_proplist_element(Atom) when is_atom(Atom) -> + true; +is_proplist_element(_) -> + false. diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl index 43e5a841dba2..b18c7f1ea2e2 100644 --- a/deps/rabbit_common/test/unit_SUITE.erl +++ b/deps/rabbit_common/test/unit_SUITE.erl @@ -13,6 +13,8 @@ -include("rabbit.hrl"). +-import(rabbit_data_coercion, [to_map_recursive/1]). + -compile(export_all). %% This cipher is listed as supported, but doesn't actually work. @@ -35,6 +37,17 @@ groups() -> data_coercion_to_proplist, data_coercion_to_list, data_coercion_to_map, + data_coercion_to_map_recursive_proplist, + data_coercion_to_map_recursive_nested_proplist, + data_coercion_to_map_recursive_mixed_structures, + data_coercion_to_map_recursive_bare_atoms, + data_coercion_to_map_recursive_empty_list, + data_coercion_to_map_recursive_list_of_maps, + data_coercion_to_map_recursive_deep_nesting, + data_coercion_to_map_recursive_atomic_values, + data_coercion_to_map_recursive_binary_keys, + data_coercion_to_map_recursive_atom_list_limitation, + data_coercion_to_map_recursive_property, data_coercion_atomize_keys_proplist, data_coercion_atomize_keys_map, pget, @@ -278,6 +291,98 @@ data_coercion_to_list(_Config) -> ?assertEqual([{a, 1}], rabbit_data_coercion:to_list([{a, 1}])), ?assertEqual([{a, 1}], rabbit_data_coercion:to_list(#{a => 1})). +data_coercion_to_map_recursive_proplist(_Config) -> + ?assertEqual(#{a => 1, b => 2}, + to_map_recursive([{a, 1}, {b, 2}])), + ?assertEqual(#{a => 1}, + to_map_recursive(#{a => 1})). + +data_coercion_to_map_recursive_nested_proplist(_Config) -> + Input = [{pattern, <<"^amq\\.">>}, + {definition, [{expires, 1800000}, {ttl, 60000}]}, + {priority, 1}], + Expected = #{pattern => <<"^amq\\.">>, + definition => #{expires => 1800000, ttl => 60000}, + priority => 1}, + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_mixed_structures(_Config) -> + Input = [{vhost, <<"/">>}, + {upstreams, [[{uri, <<"amqp://server1">>}], + [{uri, <<"amqp://server2">>}]]}], + Expected = #{vhost => <<"/">>, + upstreams => [#{uri => <<"amqp://server1">>}, + #{uri => <<"amqp://server2">>}]}, + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_bare_atoms(_Config) -> + Input = [{durable, true}, auto_delete, {exclusive, false}], + Expected = #{durable => true, auto_delete => true, exclusive => false}, + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_empty_list(_Config) -> + ?assertEqual([], to_map_recursive([])). + +data_coercion_to_map_recursive_list_of_maps(_Config) -> + Input = [#{a => 1}, #{b => 2}, #{c => [{nested, 3}]}], + Expected = [#{a => 1}, #{b => 2}, #{c => #{nested => 3}}], + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_deep_nesting(_Config) -> + Input = [{level1, [{level2, [{level3, [{level4, value}]}]}]}], + Expected = #{level1 => #{level2 => #{level3 => #{level4 => value}}}}, + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_atomic_values(_Config) -> + ?assertEqual(42, to_map_recursive(42)), + ?assertEqual(<<"binary">>, to_map_recursive(<<"binary">>)), + ?assertEqual(atom, to_map_recursive(atom)), + ?assertEqual("string", to_map_recursive("string")). + +data_coercion_to_map_recursive_binary_keys(_Config) -> + Input = [{<<"apply-to">>, <<"queues">>}, + {<<"pattern">>, <<"^abc">>}, + {<<"definition">>, [{<<"expires">>, 1800000}]}], + Expected = #{<<"apply-to">> => <<"queues">>, + <<"pattern">> => <<"^abc">>, + <<"definition">> => #{<<"expires">> => 1800000}}, + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_atom_list_limitation(_Config) -> + Input = [admin, monitoring], + Expected = #{admin => true, monitoring => true}, + ?assertEqual(Expected, to_map_recursive(Input)). + +data_coercion_to_map_recursive_property(_Config) -> + ?assert( + proper:quickcheck( + ?FORALL(Input, proplist_or_value(), + begin + Result = to_map_recursive(Input), + to_map_recursive(Result) =:= Result + end), + [{numtests, 100}, + {max_size, 10}, + {on_output, fun(".", _) -> ok; + (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) + end}])). + +proplist_or_value() -> + ?LAZY(oneof([ + oneof([key1, key2, key3]), + binary(), + integer(), + ?SIZED(Size, resize(Size div 2, proplist_gen())), + ?SIZED(Size, resize(Size div 2, list(proplist_or_value()))) + ])). + +proplist_gen() -> + list(oneof([ + {oneof([key1, key2, key3]), proplist_or_value()}, + {binary(), proplist_or_value()}, + oneof([enabled, disabled, auto]) + ])). + pget(_Config) -> ?assertEqual(1, rabbit_misc:pget(a, [{a, 1}])), ?assertEqual(undefined, rabbit_misc:pget(b, [{a, 1}])),