Skip to content

Commit 3a18fff

Browse files
ansdLoisSotoLopez
authored andcommitted
Support AMQP filter expressions (rabbitmq#12415)
* Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfea.
1 parent 09efbf8 commit 3a18fff

24 files changed

+1275
-303
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -737,15 +737,13 @@ translate_terminus_durability(configuration) -> 1;
737737
translate_terminus_durability(unsettled_state) -> 2.
738738

739739
translate_filters(Filters)
740-
when is_map(Filters) andalso
741-
map_size(Filters) == 0 ->
740+
when map_size(Filters) =:= 0 ->
742741
undefined;
743-
translate_filters(Filters)
744-
when is_map(Filters) ->
742+
translate_filters(Filters) ->
745743
{map,
746744
maps:fold(
747745
fun
748-
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
746+
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
749747
%% special case conversion
750748
Key = sym(K),
751749
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,10 @@ wrap_ap_value(V) when is_integer(V) ->
433433
case V < 0 of
434434
true -> {int, V};
435435
false -> {uint, V}
436-
end.
436+
end;
437+
wrap_ap_value(V) when is_number(V) ->
438+
%% AMQP double and Erlang float are both 64-bit.
439+
{double, V}.
437440

438441
%% LOCAL
439442
header_value(durable, undefined) -> false;

deps/amqp10_common/app.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def all_srcs(name = "all_srcs"):
7272
)
7373
filegroup(
7474
name = "public_hdrs",
75-
srcs = ["include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
75+
srcs = ["include/amqp10_filtex.hrl", "include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
7676
)
7777
filegroup(
7878
name = "private_hdrs",
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
8+
%% AMQP Filter Expressions Version 1.0 Working Draft 09
9+
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
10+
11+
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
12+
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
13+
14+
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
15+
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).

deps/rabbit/BUILD.bazel

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,7 @@ rabbitmq_integration_suite(
12071207
name = "amqp_client_SUITE",
12081208
size = "large",
12091209
additional_beam = [
1210+
":test_amqp_utils_beam",
12101211
":test_event_recorder_beam",
12111212
],
12121213
shard_count = 3,
@@ -1215,6 +1216,16 @@ rabbitmq_integration_suite(
12151216
],
12161217
)
12171218

1219+
rabbitmq_integration_suite(
1220+
name = "amqp_filtex_SUITE",
1221+
additional_beam = [
1222+
":test_amqp_utils_beam",
1223+
],
1224+
runtime_deps = [
1225+
"//deps/rabbitmq_amqp_client:erlang_app",
1226+
],
1227+
)
1228+
12181229
rabbitmq_integration_suite(
12191230
name = "amqp_proxy_protocol_SUITE",
12201231
size = "medium",
@@ -1235,6 +1246,7 @@ rabbitmq_integration_suite(
12351246
rabbitmq_integration_suite(
12361247
name = "amqp_auth_SUITE",
12371248
additional_beam = [
1249+
":test_amqp_utils_beam",
12381250
":test_event_recorder_beam",
12391251
],
12401252
shard_count = 2,
@@ -1246,6 +1258,9 @@ rabbitmq_integration_suite(
12461258
rabbitmq_integration_suite(
12471259
name = "amqp_address_SUITE",
12481260
shard_count = 2,
1261+
additional_beam = [
1262+
":test_amqp_utils_beam",
1263+
],
12491264
runtime_deps = [
12501265
"//deps/rabbitmq_amqp_client:erlang_app",
12511266
],
@@ -1358,6 +1373,7 @@ eunit(
13581373
":test_clustering_utils_beam",
13591374
":test_event_recorder_beam",
13601375
":test_rabbit_ct_hook_beam",
1376+
":test_amqp_utils_beam",
13611377
],
13621378
target = ":test_erlang_app",
13631379
test_env = {

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ define ct_master.erl
258258
endef
259259

260260
PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
261-
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
261+
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
262262
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
263263
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
264264

deps/rabbit/app.bzl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def all_beam_files(name = "all_beam_files"):
4545
"src/rabbit_access_control.erl",
4646
"src/rabbit_alarm.erl",
4747
"src/rabbit_amqp1_0.erl",
48+
"src/rabbit_amqp_filtex.erl",
4849
"src/rabbit_amqp_management.erl",
4950
"src/rabbit_amqp_reader.erl",
5051
"src/rabbit_amqp_session.erl",
@@ -302,6 +303,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
302303
"src/rabbit_access_control.erl",
303304
"src/rabbit_alarm.erl",
304305
"src/rabbit_amqp1_0.erl",
306+
"src/rabbit_amqp_filtex.erl",
305307
"src/rabbit_amqp_management.erl",
306308
"src/rabbit_amqp_reader.erl",
307309
"src/rabbit_amqp_session.erl",
@@ -578,6 +580,7 @@ def all_srcs(name = "all_srcs"):
578580
"src/rabbit_access_control.erl",
579581
"src/rabbit_alarm.erl",
580582
"src/rabbit_amqp1_0.erl",
583+
"src/rabbit_amqp_filtex.erl",
581584
"src/rabbit_amqp_management.erl",
582585
"src/rabbit_amqp_reader.erl",
583586
"src/rabbit_amqp_session.erl",
@@ -2195,3 +2198,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21952198
erlc_opts = "//:test_erlc_opts",
21962199
deps = ["//deps/amqp_client:erlang_app"],
21972200
)
2201+
erlang_bytecode(
2202+
name = "amqp_filtex_SUITE_beam_files",
2203+
testonly = True,
2204+
srcs = ["test/amqp_filtex_SUITE.erl"],
2205+
outs = ["test/amqp_filtex_SUITE.beam"],
2206+
app_name = "rabbit",
2207+
erlc_opts = "//:test_erlc_opts",
2208+
deps = ["//deps/amqp10_common:erlang_app"],
2209+
)
2210+
erlang_bytecode(
2211+
name = "test_amqp_utils_beam",
2212+
testonly = True,
2213+
srcs = ["test/amqp_utils.erl"],
2214+
outs = ["test/amqp_utils.beam"],
2215+
app_name = "rabbit",
2216+
erlc_opts = "//:test_erlc_opts",
2217+
)

deps/rabbit/ct.test.spec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
, amqp_auth_SUITE
1717
, amqp_client_SUITE
1818
, amqp_credit_api_v2_SUITE
19+
, amqp_filtex_SUITE
1920
, amqp_proxy_protocol_SUITE
2021
, amqp_system_SUITE
2122
, amqpl_consumer_ack_SUITE

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ message_id(BasicMsg) ->
301301
mc_compat:message_id(BasicMsg).
302302

303303
-spec property(atom(), state()) ->
304-
{utf8, binary()} | undefined.
304+
tagged_value().
305305
property(Property, #?MODULE{protocol = Proto,
306306
data = Data}) ->
307307
Proto:property(Property, Data);

deps/rabbit/src/mc_amqp.erl

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
-define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100).
2323

24-
-define(SIMPLE_VALUE(V),
24+
-define(IS_SIMPLE_VALUE(V),
2525
is_binary(V) orelse
2626
is_number(V) orelse
2727
is_boolean(V)).
@@ -145,16 +145,32 @@ property(Prop, #v1{bare_and_footer = Bin,
145145
Props = amqp10_framing:decode(PropsDescribed),
146146
property0(Prop, Props).
147147

148-
property0(correlation_id, #'v1_0.properties'{correlation_id = Corr}) ->
149-
Corr;
150-
property0(message_id, #'v1_0.properties'{message_id = MsgId}) ->
151-
MsgId;
152-
property0(user_id, #'v1_0.properties'{user_id = UserId}) ->
153-
UserId;
154-
property0(subject, #'v1_0.properties'{subject = Subject}) ->
155-
Subject;
156-
property0(to, #'v1_0.properties'{to = To}) ->
157-
To;
148+
property0(message_id, #'v1_0.properties'{message_id = Val}) ->
149+
Val;
150+
property0(user_id, #'v1_0.properties'{user_id = Val}) ->
151+
Val;
152+
property0(to, #'v1_0.properties'{to = Val}) ->
153+
Val;
154+
property0(subject, #'v1_0.properties'{subject = Val}) ->
155+
Val;
156+
property0(reply_to, #'v1_0.properties'{reply_to = Val}) ->
157+
Val;
158+
property0(correlation_id, #'v1_0.properties'{correlation_id = Val}) ->
159+
Val;
160+
property0(content_type, #'v1_0.properties'{content_type = Val}) ->
161+
Val;
162+
property0(content_encoding, #'v1_0.properties'{content_encoding = Val}) ->
163+
Val;
164+
property0(absolute_expiry_time, #'v1_0.properties'{absolute_expiry_time = Val}) ->
165+
Val;
166+
property0(creation_time, #'v1_0.properties'{creation_time = Val}) ->
167+
Val;
168+
property0(group_id, #'v1_0.properties'{group_id = Val}) ->
169+
Val;
170+
property0(group_sequence, #'v1_0.properties'{group_sequence = Val}) ->
171+
Val;
172+
property0(reply_to_group_id, #'v1_0.properties'{reply_to_group_id = Val}) ->
173+
Val;
158174
property0(_Prop, #'v1_0.properties'{}) ->
159175
undefined.
160176

@@ -454,7 +470,7 @@ message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
454470
message_annotations_as_simple_map0(Content) ->
455471
%% the section record format really is terrible
456472
lists:filtermap(fun({{symbol, K}, {_T, V}})
457-
when ?SIMPLE_VALUE(V) ->
473+
when ?IS_SIMPLE_VALUE(V) ->
458474
{true, {K, V}};
459475
(_) ->
460476
false
@@ -480,7 +496,7 @@ application_properties_as_simple_map(
480496
application_properties_as_simple_map0(Content, L) ->
481497
%% the section record format really is terrible
482498
lists:foldl(fun({{utf8, K}, {_T, V}}, Acc)
483-
when ?SIMPLE_VALUE(V) ->
499+
when ?IS_SIMPLE_VALUE(V) ->
484500
[{K, V} | Acc];
485501
({{utf8, K}, V}, Acc)
486502
when V =:= undefined orelse is_boolean(V) ->

0 commit comments

Comments
 (0)