Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,27 @@ interface FilterOptions<T> {
* @return type-parameter object
*/
T propertySymbol(String key, String value);

/**
* Set an SQL filter expression.
*
* <p>Section 6 of the AMQP Filter Expressions specification defines the semantics of the
* feature, but the <b>SQL syntax follows the <a
* href="https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax">JMS
* message selector syntax</a></b>.
*
* <p>Requires RabbitMQ 4.2 or more.
*
* @param sql SQL expression
* @return type-parameter object
* @see <a
* href="https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax">JMS
* message selector syntax</a>
* @see <a
* href="https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227">AMQP
* Filter Expressions</a>
*/
T sql(String sql);
}

/**
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions;
import static java.lang.System.nanoTime;
import static java.time.Duration.ofNanos;

Expand Down Expand Up @@ -115,7 +116,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final ConnectionSettings.AffinityStrategy affinityStrategy;
private final String name;
private final Lock instanceLock = new ReentrantLock();
private final boolean filterExpressionsSupported, setTokenSupported;
private final boolean filterExpressionsSupported,
setTokenSupported,
sqlFilterExpressionsSupported;
private volatile ConsumerWorkService consumerWorkService;
private volatile Executor dispatchingExecutor;
private final boolean privateDispatchingExecutor;
Expand Down Expand Up @@ -212,6 +215,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
String brokerVersion = brokerVersion(this.nativeConnection);
this.filterExpressionsSupported = supportFilterExpressions(brokerVersion);
this.setTokenSupported = supportSetToken(brokerVersion);
this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion);
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
this.state(OPEN);
this.environment.metricsCollector().openConnection();
Expand Down Expand Up @@ -856,6 +860,10 @@ boolean filterExpressionsSupported() {
return this.filterExpressionsSupported;
}

boolean sqlFilterExpressionsSupported() {
return this.sqlFilterExpressionsSupported;
}

boolean setTokenSupported() {
return this.setTokenSupported;
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ private ClientReceiver createNativeReceiver(
(ClientReceiver)
ExceptionUtils.wrapGet(
nativeSession.openReceiver(address, receiverOptions).openFuture());
boolean filterOk = true;
if (!filters.isEmpty()) {
Map<String, String> remoteSourceFilters = receiver.source().filters();
for (Map.Entry<String, Object> localEntry : localSourceFilters.entrySet()) {
Expand All @@ -227,9 +228,15 @@ private ClientReceiver createNativeReceiver(
"Missing filter value in attach response: {} => {}",
localEntry.getKey(),
localEntry.getValue());
filterOk = false;
}
}
}
if (!filterOk) {
receiver.close();
throw new AmqpException(
"The sending endpoint filters do not match the receiving endpoint filters");
}
return receiver;
} catch (ClientException e) {
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ static StreamOptions streamOptions(Map<String, DescribedType> filters) {

private static class DefaultStreamFilterOptions implements StreamFilterOptions {

private final StreamOptions streamOptions;
private final DefaultStreamOptions streamOptions;
private final Map<String, DescribedType> filters;

private DefaultStreamFilterOptions(
StreamOptions streamOptions, Map<String, DescribedType> filters) {
DefaultStreamOptions streamOptions, Map<String, DescribedType> filters) {
this.streamOptions = streamOptions;
this.filters = filters;
}
Expand Down Expand Up @@ -440,6 +440,16 @@ public StreamFilterOptions propertySymbol(String key, String value) {
return this.applicationPropertyFilter(key, Symbol.valueOf(value));
}

@Override
public StreamFilterOptions sql(String sql) {
if (!this.streamOptions.builder.connection.filterExpressionsSupported()) {
throw new IllegalArgumentException(
"AMQP SQL filter expressions requires at least RabbitMQ 4.2.0");
}
this.filters.put("sql-filter", filterValue("apache.org:selector-filter:string", sql));
return this;
}

@Override
public StreamOptions stream() {
return this.streamOptions;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ static boolean is4_0_OrMore(String brokerVersion) {
}
}

static boolean is4_2_OrMore(String brokerVersion) {
try {
return versionCompare(currentVersion(brokerVersion), "4.2.0") >= 0;
} catch (Exception e) {
LOGGER.debug("Unable to parse broker version {}", brokerVersion, e);
return true;
}
}

static boolean is4_1_OrMore(String brokerVersion) {
try {
return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0;
Expand All @@ -253,6 +262,10 @@ static boolean supportSetToken(String brokerVersion) {
return is4_1_OrMore(brokerVersion);
}

static boolean supportSqlFilterExpressions(String brokerVersion) {
return is4_2_OrMore(brokerVersion);
}

static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo {

private final String address;
Expand Down
32 changes: 32 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0;
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable;
import static java.nio.charset.StandardCharsets.*;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.ConsumerBuilder;
Expand Down Expand Up @@ -469,6 +471,36 @@ void filterExpressionStringModifier() {
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
}

@Test
@BrokerVersionAtLeast(RABBITMQ_4_2_0)
void sqlFilterExpressionsShouldFilterMessages() {
publish(1, m -> m.subject("abc 123"));
publish(1, m -> m.subject("foo bar"));
publish(1, m -> m.subject("ab 12"));

List<Message> msgs = consume(2, m -> m.sql("properties.subject LIKE 'ab%'"));
msgs.forEach(m -> assertThat(m.subject()).startsWith("ab"));

msgs = consume(1, m -> m.sql("properties.subject like 'foo%'"));
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
}

@Test
@BrokerVersionAtLeast(RABBITMQ_4_2_0)
void incorrectFilterShouldThrowException() {
assertThatThrownBy(
() ->
connection.consumerBuilder().queue(name).messageHandler((ctx, msg) -> {}).stream()
.offset(FIRST)
.filter()
.sql("TRUE TRUE")
.stream()
.builder()
.build())
.isInstanceOf(AmqpException.class)
.hasMessageContaining("filters do not match");
}

void publish(int messageCount) {
this.publish(messageCount, UnaryOperator.identity());
}
Expand Down