diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java index 7a4efbb87..ffdaa33d7 100644 --- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java @@ -565,6 +565,27 @@ interface FilterOptions { * @return type-parameter object */ T propertySymbol(String key, String value); + + /** + * Set an SQL filter expression. + * + *

Section 6 of the AMQP Filter Expressions specification defines the semantics of the + * feature, but the SQL syntax follows the JMS + * message selector syntax. + * + *

Requires RabbitMQ 4.2 or more. + * + * @param sql SQL expression + * @return type-parameter object + * @see JMS + * message selector syntax + * @see AMQP + * Filter Expressions + */ + T sql(String sql); } /** diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index d4e5e9b86..1ab4d9963 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -26,6 +26,7 @@ import static com.rabbitmq.client.amqp.impl.Tuples.pair; import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions; import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken; +import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions; import static java.lang.System.nanoTime; import static java.time.Duration.ofNanos; @@ -115,7 +116,9 @@ final class AmqpConnection extends ResourceBase implements Connection { private final ConnectionSettings.AffinityStrategy affinityStrategy; private final String name; private final Lock instanceLock = new ReentrantLock(); - private final boolean filterExpressionsSupported, setTokenSupported; + private final boolean filterExpressionsSupported, + setTokenSupported, + sqlFilterExpressionsSupported; private volatile ConsumerWorkService consumerWorkService; private volatile Executor dispatchingExecutor; private final boolean privateDispatchingExecutor; @@ -212,6 +215,7 @@ final class AmqpConnection extends ResourceBase implements Connection { String brokerVersion = brokerVersion(this.nativeConnection); this.filterExpressionsSupported = supportFilterExpressions(brokerVersion); this.setTokenSupported = supportSetToken(brokerVersion); + this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion); LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename()); this.state(OPEN); this.environment.metricsCollector().openConnection(); @@ -856,6 +860,10 @@ boolean filterExpressionsSupported() { return this.filterExpressionsSupported; } + boolean sqlFilterExpressionsSupported() { + return this.sqlFilterExpressionsSupported; + } + boolean setTokenSupported() { return this.setTokenSupported; } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 82d6edac5..60936c50c 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -219,6 +219,7 @@ private ClientReceiver createNativeReceiver( (ClientReceiver) ExceptionUtils.wrapGet( nativeSession.openReceiver(address, receiverOptions).openFuture()); + boolean filterOk = true; if (!filters.isEmpty()) { Map remoteSourceFilters = receiver.source().filters(); for (Map.Entry localEntry : localSourceFilters.entrySet()) { @@ -227,9 +228,15 @@ private ClientReceiver createNativeReceiver( "Missing filter value in attach response: {} => {}", localEntry.getKey(), localEntry.getValue()); + filterOk = false; } } } + if (!filterOk) { + receiver.close(); + throw new AmqpException( + "The sending endpoint filters do not match the receiving endpoint filters"); + } return receiver; } catch (ClientException e) { throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java index 1a45c66eb..e62d3b4d2 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java @@ -226,11 +226,11 @@ static StreamOptions streamOptions(Map filters) { private static class DefaultStreamFilterOptions implements StreamFilterOptions { - private final StreamOptions streamOptions; + private final DefaultStreamOptions streamOptions; private final Map filters; private DefaultStreamFilterOptions( - StreamOptions streamOptions, Map filters) { + DefaultStreamOptions streamOptions, Map filters) { this.streamOptions = streamOptions; this.filters = filters; } @@ -440,6 +440,16 @@ public StreamFilterOptions propertySymbol(String key, String value) { return this.applicationPropertyFilter(key, Symbol.valueOf(value)); } + @Override + public StreamFilterOptions sql(String sql) { + if (!this.streamOptions.builder.connection.filterExpressionsSupported()) { + throw new IllegalArgumentException( + "AMQP SQL filter expressions requires at least RabbitMQ 4.2.0"); + } + this.filters.put("sql-filter", filterValue("apache.org:selector-filter:string", sql)); + return this; + } + @Override public StreamOptions stream() { return this.streamOptions; diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index 0f69534e3..3078eec17 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -236,6 +236,15 @@ static boolean is4_0_OrMore(String brokerVersion) { } } + static boolean is4_2_OrMore(String brokerVersion) { + try { + return versionCompare(currentVersion(brokerVersion), "4.2.0") >= 0; + } catch (Exception e) { + LOGGER.debug("Unable to parse broker version {}", brokerVersion, e); + return true; + } + } + static boolean is4_1_OrMore(String brokerVersion) { try { return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0; @@ -253,6 +262,10 @@ static boolean supportSetToken(String brokerVersion) { return is4_1_OrMore(brokerVersion); } + static boolean supportSqlFilterExpressions(String brokerVersion) { + return is4_2_OrMore(brokerVersion); + } + static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo { private final String address; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index 32837f83c..c1d29c64a 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -21,6 +21,7 @@ import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0; +import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0; import static com.rabbitmq.client.amqp.impl.TestUtils.sync; import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable; import static java.nio.charset.StandardCharsets.*; @@ -28,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.rabbitmq.client.amqp.AmqpException; import com.rabbitmq.client.amqp.Connection; import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.ConsumerBuilder; @@ -469,6 +471,36 @@ void filterExpressionStringModifier() { msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); } + @Test + @BrokerVersionAtLeast(RABBITMQ_4_2_0) + void sqlFilterExpressionsShouldFilterMessages() { + publish(1, m -> m.subject("abc 123")); + publish(1, m -> m.subject("foo bar")); + publish(1, m -> m.subject("ab 12")); + + List msgs = consume(2, m -> m.sql("properties.subject LIKE 'ab%'")); + msgs.forEach(m -> assertThat(m.subject()).startsWith("ab")); + + msgs = consume(1, m -> m.sql("properties.subject like 'foo%'")); + msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); + } + + @Test + @BrokerVersionAtLeast(RABBITMQ_4_2_0) + void incorrectFilterShouldThrowException() { + assertThatThrownBy( + () -> + connection.consumerBuilder().queue(name).messageHandler((ctx, msg) -> {}).stream() + .offset(FIRST) + .filter() + .sql("TRUE TRUE") + .stream() + .builder() + .build()) + .isInstanceOf(AmqpException.class) + .hasMessageContaining("filters do not match"); + } + void publish(int messageCount) { this.publish(messageCount, UnaryOperator.identity()); }