From 39dc54a3bd8ece9ab25039c1cf0b78baad675aed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 24 Jun 2025 17:28:10 +0200 Subject: [PATCH 1/3] Support SQL filter expressions for stream consumers This is a RabbitMQ 4.2 feature. --- .github/workflows/test-pr.yml | 2 ++ .../rabbitmq/client/amqp/ConsumerBuilder.java | 21 +++++++++++++++++++ .../client/amqp/impl/AmqpConnection.java | 10 ++++++++- .../client/amqp/impl/AmqpConsumerBuilder.java | 14 +++++++++++-- .../com/rabbitmq/client/amqp/impl/Utils.java | 14 +++++++++++++ .../client/amqp/impl/SourceFiltersTest.java | 15 +++++++++++++ 6 files changed, 73 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 36fdfcda7..e38744ca0 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,6 +24,8 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh + env: + RABBITMQ_IMAGE: pivotalrabbitmq/rabbitmq:pr-14110-otp27 - name: Start toxiproxy run: ci/start-toxiproxy.sh - name: Display Java version diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java index 7a4efbb87..ffdaa33d7 100644 --- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java @@ -565,6 +565,27 @@ interface FilterOptions { * @return type-parameter object */ T propertySymbol(String key, String value); + + /** + * Set an SQL filter expression. + * + *

Section 6 of the AMQP Filter Expressions specification defines the semantics of the + * feature, but the SQL syntax follows the JMS + * message selector syntax. + * + *

Requires RabbitMQ 4.2 or more. + * + * @param sql SQL expression + * @return type-parameter object + * @see JMS + * message selector syntax + * @see AMQP + * Filter Expressions + */ + T sql(String sql); } /** diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index d4e5e9b86..1ab4d9963 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -26,6 +26,7 @@ import static com.rabbitmq.client.amqp.impl.Tuples.pair; import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions; import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken; +import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions; import static java.lang.System.nanoTime; import static java.time.Duration.ofNanos; @@ -115,7 +116,9 @@ final class AmqpConnection extends ResourceBase implements Connection { private final ConnectionSettings.AffinityStrategy affinityStrategy; private final String name; private final Lock instanceLock = new ReentrantLock(); - private final boolean filterExpressionsSupported, setTokenSupported; + private final boolean filterExpressionsSupported, + setTokenSupported, + sqlFilterExpressionsSupported; private volatile ConsumerWorkService consumerWorkService; private volatile Executor dispatchingExecutor; private final boolean privateDispatchingExecutor; @@ -212,6 +215,7 @@ final class AmqpConnection extends ResourceBase implements Connection { String brokerVersion = brokerVersion(this.nativeConnection); this.filterExpressionsSupported = supportFilterExpressions(brokerVersion); this.setTokenSupported = supportSetToken(brokerVersion); + this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion); LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename()); this.state(OPEN); this.environment.metricsCollector().openConnection(); @@ -856,6 +860,10 @@ boolean filterExpressionsSupported() { return this.filterExpressionsSupported; } + boolean sqlFilterExpressionsSupported() { + return this.sqlFilterExpressionsSupported; + } + boolean setTokenSupported() { return this.setTokenSupported; } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java index 1a45c66eb..e62d3b4d2 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java @@ -226,11 +226,11 @@ static StreamOptions streamOptions(Map filters) { private static class DefaultStreamFilterOptions implements StreamFilterOptions { - private final StreamOptions streamOptions; + private final DefaultStreamOptions streamOptions; private final Map filters; private DefaultStreamFilterOptions( - StreamOptions streamOptions, Map filters) { + DefaultStreamOptions streamOptions, Map filters) { this.streamOptions = streamOptions; this.filters = filters; } @@ -440,6 +440,16 @@ public StreamFilterOptions propertySymbol(String key, String value) { return this.applicationPropertyFilter(key, Symbol.valueOf(value)); } + @Override + public StreamFilterOptions sql(String sql) { + if (!this.streamOptions.builder.connection.filterExpressionsSupported()) { + throw new IllegalArgumentException( + "AMQP SQL filter expressions requires at least RabbitMQ 4.2.0"); + } + this.filters.put("sql-filter", filterValue("apache.org:selector-filter:string", sql)); + return this; + } + @Override public StreamOptions stream() { return this.streamOptions; diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index 0f69534e3..a0cec51d4 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -236,6 +236,15 @@ static boolean is4_0_OrMore(String brokerVersion) { } } + static boolean is4_2_OrMore(String brokerVersion) { + try { + return versionCompare(currentVersion(brokerVersion), "4.2.0") >= 0; + } catch (Exception e) { + LOGGER.debug("Unable to parse broker version {}", brokerVersion, e); + return true; + } + } + static boolean is4_1_OrMore(String brokerVersion) { try { return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0; @@ -253,6 +262,11 @@ static boolean supportSetToken(String brokerVersion) { return is4_1_OrMore(brokerVersion); } + static boolean supportSqlFilterExpressions(String brokerVersion) { + // TODO should be 4.2 + return is4_1_OrMore(brokerVersion); + } + static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo { private final String address; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index 32837f83c..dab0b1c5b 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -469,6 +469,21 @@ void filterExpressionStringModifier() { msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); } + @Test + // TODO should be 4.2 + @BrokerVersionAtLeast(RABBITMQ_4_1_0) + void filterExpressionSql() { + publish(1, m -> m.subject("abc 123")); + publish(1, m -> m.subject("foo bar")); + publish(1, m -> m.subject("ab 12")); + + List msgs = consume(2, m -> m.sql("properties.subject LIKE 'ab%'")); + msgs.forEach(m -> assertThat(m.subject()).startsWith("ab")); + + msgs = consume(1, m -> m.sql("properties.subject like 'foo%'")); + msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); + } + void publish(int messageCount) { this.publish(messageCount, UnaryOperator.identity()); } From d576d94a0338fc2e2a1fc91b8ab84fb268434c9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 24 Jun 2025 17:52:31 +0200 Subject: [PATCH 2/3] Throw exception if echoed filters do not match requested filters Fixes #217 --- .../client/amqp/impl/AmqpConsumer.java | 7 +++++++ .../client/amqp/impl/SourceFiltersTest.java | 20 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 82d6edac5..60936c50c 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -219,6 +219,7 @@ private ClientReceiver createNativeReceiver( (ClientReceiver) ExceptionUtils.wrapGet( nativeSession.openReceiver(address, receiverOptions).openFuture()); + boolean filterOk = true; if (!filters.isEmpty()) { Map remoteSourceFilters = receiver.source().filters(); for (Map.Entry localEntry : localSourceFilters.entrySet()) { @@ -227,9 +228,15 @@ private ClientReceiver createNativeReceiver( "Missing filter value in attach response: {} => {}", localEntry.getKey(), localEntry.getValue()); + filterOk = false; } } } + if (!filterOk) { + receiver.close(); + throw new AmqpException( + "The sending endpoint filters do not match the receiving endpoint filters"); + } return receiver; } catch (ClientException e) { throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index dab0b1c5b..1007b28c2 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.rabbitmq.client.amqp.AmqpException; import com.rabbitmq.client.amqp.Connection; import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.ConsumerBuilder; @@ -472,7 +473,7 @@ void filterExpressionStringModifier() { @Test // TODO should be 4.2 @BrokerVersionAtLeast(RABBITMQ_4_1_0) - void filterExpressionSql() { + void sqlFilterExpressionsShouldFilterMessages() { publish(1, m -> m.subject("abc 123")); publish(1, m -> m.subject("foo bar")); publish(1, m -> m.subject("ab 12")); @@ -484,6 +485,23 @@ void filterExpressionSql() { msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); } + @Test + // TODO should be 4.2 + @BrokerVersionAtLeast(RABBITMQ_4_1_0) + void incorrectFilterShouldThrowException() { + assertThatThrownBy( + () -> + connection.consumerBuilder().queue(name).messageHandler((ctx, msg) -> {}).stream() + .offset(FIRST) + .filter() + .sql("TRUE TRUE") + .stream() + .builder() + .build()) + .isInstanceOf(AmqpException.class) + .hasMessageContaining("filters do not match"); + } + void publish(int messageCount) { this.publish(messageCount, UnaryOperator.identity()); } From 5d3ec254c3e590546065e988a86239d970624d1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 30 Jun 2025 10:11:10 +0200 Subject: [PATCH 3/3] Test SQL filter expressions against RabbitMQ 4.2 --- .github/workflows/test-pr.yml | 2 -- src/main/java/com/rabbitmq/client/amqp/impl/Utils.java | 3 +-- .../com/rabbitmq/client/amqp/impl/SourceFiltersTest.java | 7 +++---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index e38744ca0..36fdfcda7 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,8 +24,6 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh - env: - RABBITMQ_IMAGE: pivotalrabbitmq/rabbitmq:pr-14110-otp27 - name: Start toxiproxy run: ci/start-toxiproxy.sh - name: Display Java version diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index a0cec51d4..3078eec17 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -263,8 +263,7 @@ static boolean supportSetToken(String brokerVersion) { } static boolean supportSqlFilterExpressions(String brokerVersion) { - // TODO should be 4.2 - return is4_1_OrMore(brokerVersion); + return is4_2_OrMore(brokerVersion); } static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index 1007b28c2..c1d29c64a 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -21,6 +21,7 @@ import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0; +import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0; import static com.rabbitmq.client.amqp.impl.TestUtils.sync; import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable; import static java.nio.charset.StandardCharsets.*; @@ -471,8 +472,7 @@ void filterExpressionStringModifier() { } @Test - // TODO should be 4.2 - @BrokerVersionAtLeast(RABBITMQ_4_1_0) + @BrokerVersionAtLeast(RABBITMQ_4_2_0) void sqlFilterExpressionsShouldFilterMessages() { publish(1, m -> m.subject("abc 123")); publish(1, m -> m.subject("foo bar")); @@ -486,8 +486,7 @@ void sqlFilterExpressionsShouldFilterMessages() { } @Test - // TODO should be 4.2 - @BrokerVersionAtLeast(RABBITMQ_4_1_0) + @BrokerVersionAtLeast(RABBITMQ_4_2_0) void incorrectFilterShouldThrowException() { assertThatThrownBy( () ->