Skip to content

Commit 39dc54a

Browse files
committed
Support SQL filter expressions for stream consumers
This is a RabbitMQ 4.2 feature.
1 parent 96ec272 commit 39dc54a

File tree

6 files changed

+73
-3
lines changed

6 files changed

+73
-3
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27+
env:
28+
RABBITMQ_IMAGE: pivotalrabbitmq/rabbitmq:pr-14110-otp27
2729
- name: Start toxiproxy
2830
run: ci/start-toxiproxy.sh
2931
- name: Display Java version

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,27 @@ interface FilterOptions<T> {
565565
* @return type-parameter object
566566
*/
567567
T propertySymbol(String key, String value);
568+
569+
/**
570+
* Set an SQL filter expression.
571+
*
572+
* <p>Section 6 of the AMQP Filter Expressions specification defines the semantics of the
573+
* feature, but the <b>SQL syntax follows the <a
574+
* href="https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax">JMS
575+
* message selector syntax</a></b>.
576+
*
577+
* <p>Requires RabbitMQ 4.2 or more.
578+
*
579+
* @param sql SQL expression
580+
* @return type-parameter object
581+
* @see <a
582+
* href="https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax">JMS
583+
* message selector syntax</a>
584+
* @see <a
585+
* href="https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227">AMQP
586+
* Filter Expressions</a>
587+
*/
588+
T sql(String sql);
568589
}
569590

570591
/**

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
2727
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2828
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
29+
import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions;
2930
import static java.lang.System.nanoTime;
3031
import static java.time.Duration.ofNanos;
3132

@@ -115,7 +116,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
115116
private final ConnectionSettings.AffinityStrategy affinityStrategy;
116117
private final String name;
117118
private final Lock instanceLock = new ReentrantLock();
118-
private final boolean filterExpressionsSupported, setTokenSupported;
119+
private final boolean filterExpressionsSupported,
120+
setTokenSupported,
121+
sqlFilterExpressionsSupported;
119122
private volatile ConsumerWorkService consumerWorkService;
120123
private volatile Executor dispatchingExecutor;
121124
private final boolean privateDispatchingExecutor;
@@ -212,6 +215,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
212215
String brokerVersion = brokerVersion(this.nativeConnection);
213216
this.filterExpressionsSupported = supportFilterExpressions(brokerVersion);
214217
this.setTokenSupported = supportSetToken(brokerVersion);
218+
this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion);
215219
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
216220
this.state(OPEN);
217221
this.environment.metricsCollector().openConnection();
@@ -856,6 +860,10 @@ boolean filterExpressionsSupported() {
856860
return this.filterExpressionsSupported;
857861
}
858862

863+
boolean sqlFilterExpressionsSupported() {
864+
return this.sqlFilterExpressionsSupported;
865+
}
866+
859867
boolean setTokenSupported() {
860868
return this.setTokenSupported;
861869
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ static StreamOptions streamOptions(Map<String, DescribedType> filters) {
226226

227227
private static class DefaultStreamFilterOptions implements StreamFilterOptions {
228228

229-
private final StreamOptions streamOptions;
229+
private final DefaultStreamOptions streamOptions;
230230
private final Map<String, DescribedType> filters;
231231

232232
private DefaultStreamFilterOptions(
233-
StreamOptions streamOptions, Map<String, DescribedType> filters) {
233+
DefaultStreamOptions streamOptions, Map<String, DescribedType> filters) {
234234
this.streamOptions = streamOptions;
235235
this.filters = filters;
236236
}
@@ -440,6 +440,16 @@ public StreamFilterOptions propertySymbol(String key, String value) {
440440
return this.applicationPropertyFilter(key, Symbol.valueOf(value));
441441
}
442442

443+
@Override
444+
public StreamFilterOptions sql(String sql) {
445+
if (!this.streamOptions.builder.connection.filterExpressionsSupported()) {
446+
throw new IllegalArgumentException(
447+
"AMQP SQL filter expressions requires at least RabbitMQ 4.2.0");
448+
}
449+
this.filters.put("sql-filter", filterValue("apache.org:selector-filter:string", sql));
450+
return this;
451+
}
452+
443453
@Override
444454
public StreamOptions stream() {
445455
return this.streamOptions;

src/main/java/com/rabbitmq/client/amqp/impl/Utils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ static boolean is4_0_OrMore(String brokerVersion) {
236236
}
237237
}
238238

239+
static boolean is4_2_OrMore(String brokerVersion) {
240+
try {
241+
return versionCompare(currentVersion(brokerVersion), "4.2.0") >= 0;
242+
} catch (Exception e) {
243+
LOGGER.debug("Unable to parse broker version {}", brokerVersion, e);
244+
return true;
245+
}
246+
}
247+
239248
static boolean is4_1_OrMore(String brokerVersion) {
240249
try {
241250
return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0;
@@ -253,6 +262,11 @@ static boolean supportSetToken(String brokerVersion) {
253262
return is4_1_OrMore(brokerVersion);
254263
}
255264

265+
static boolean supportSqlFilterExpressions(String brokerVersion) {
266+
// TODO should be 4.2
267+
return is4_1_OrMore(brokerVersion);
268+
}
269+
256270
static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo {
257271

258272
private final String address;

src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,21 @@ void filterExpressionStringModifier() {
469469
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
470470
}
471471

472+
@Test
473+
// TODO should be 4.2
474+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
475+
void filterExpressionSql() {
476+
publish(1, m -> m.subject("abc 123"));
477+
publish(1, m -> m.subject("foo bar"));
478+
publish(1, m -> m.subject("ab 12"));
479+
480+
List<Message> msgs = consume(2, m -> m.sql("properties.subject LIKE 'ab%'"));
481+
msgs.forEach(m -> assertThat(m.subject()).startsWith("ab"));
482+
483+
msgs = consume(1, m -> m.sql("properties.subject like 'foo%'"));
484+
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
485+
}
486+
472487
void publish(int messageCount) {
473488
this.publish(messageCount, UnaryOperator.identity());
474489
}

0 commit comments

Comments
 (0)