From 63a91d0a7e61df64ef8e32301d0fe903612ddb06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 4 Oct 2024 18:17:08 +0200 Subject: [PATCH 1/4] Start support for filter expressions WIP --- pom.xml | 8 + .../rabbitmq/client/amqp/ConsumerBuilder.java | 79 +++ .../client/amqp/impl/AmqpConsumer.java | 7 +- .../client/amqp/impl/AmqpConsumerBuilder.java | 257 +++++++- .../client/amqp/impl/AmqpMessage.java | 2 +- .../amqp/impl/AmqpTestInfrastructure.java | 27 + .../impl/AmqpTestInfrastructureExtension.java | 1 + .../rabbitmq/client/amqp/impl/Assertions.java | 61 ++ .../client/amqp/impl/SourceFiltersTest.java | 549 +++++++++++------- .../rabbitmq/client/amqp/impl/TestUtils.java | 5 +- 10 files changed, 748 insertions(+), 248 deletions(-) create mode 100644 src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java diff --git a/pom.xml b/pom.xml index c23d4ad52..6b51ceabf 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ 5.11.2 3.26.3 5.14.1 + 1.9.1 5.20.0 1.3.4 1.0.4 @@ -184,6 +185,13 @@ test + + net.jqwik + jqwik + ${jqwik.version} + test + + eu.rekawek.toxiproxy toxiproxy-java diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java index 64ab859b7..1eb1a1a1f 100644 --- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java @@ -17,7 +17,10 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp; +import java.math.BigDecimal; import java.time.Instant; +import java.util.UUID; +import org.apache.qpid.protonj2.types.*; /** API to configure and create a {@link Consumer}. */ public interface ConsumerBuilder { @@ -158,6 +161,8 @@ interface StreamOptions { */ StreamOptions filterMatchUnfiltered(boolean matchUnfiltered); + StreamFilterOptions filter(); + /** * Return the consumer builder. * @@ -176,6 +181,80 @@ enum StreamOffsetSpecification { NEXT } + interface FilterOptions { + + T messageId(Object id); + + T messageId(String id); + + T messageId(long id); + + T messageId(byte[] id); + + T messageId(UUID id); + + T correlationId(Object correlationId); + + T correlationId(String correlationId); + + T correlationId(long correlationId); + + T correlationId(byte[] correlationId); + + T correlationId(UUID correlationId); + + T userId(byte[] userId); + + T to(String to); + + T subject(String subject); + + T property(String key, boolean value); + + T property(String key, byte value); + + T property(String key, short value); + + T property(String key, int value); + + T property(String key, long value); + + T propertyUnsigned(String key, byte value); + + T propertyUnsigned(String key, short value); + + T propertyUnsigned(String key, int value); + + T propertyUnsigned(String key, long value); + + T property(String key, float value); + + T property(String key, double value); + + T propertyDecimal32(String key, BigDecimal value); + + T propertyDecimal64(String key, BigDecimal value); + + T propertyDecimal128(String key, BigDecimal value); + + T property(String key, char value); + + T propertyTimestamp(String key, long value); + + T property(String key, UUID value); + + T property(String key, byte[] value); + + T property(String key, String value); + + T propertySymbol(String key, String value); + } + + interface StreamFilterOptions extends FilterOptions { + + StreamOptions stream(); + } + /** * Callback to modify a consumer subscription before the link creation. * diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 2e6a5935d..5356ae13f 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -41,6 +41,7 @@ import org.apache.qpid.protonj2.engine.impl.ProtonLinkCreditState; import org.apache.qpid.protonj2.engine.impl.ProtonReceiver; import org.apache.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow; +import org.apache.qpid.protonj2.types.DescribedType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer { private final Long id; private final String address; private final String queue; - private final Map filters; + private final Map filters; private final Map linkProperties; private final ConsumerBuilder.SubscriptionListener subscriptionListener; private final AmqpConnection connection; @@ -161,7 +162,7 @@ private ClientReceiver createNativeReceiver( Session nativeSession, String address, Map properties, - Map filters, + Map filters, SubscriptionListener subscriptionListener) { try { filters = new LinkedHashMap<>(filters); @@ -175,7 +176,7 @@ private ClientReceiver createNativeReceiver( .creditWindow(0) .properties(properties); if (!filters.isEmpty()) { - receiverOptions.sourceOptions().filters(filters); + receiverOptions.sourceOptions().filters(Map.copyOf(filters)); } return (ClientReceiver) ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture()); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java index 789fa3a96..dac13edbc 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java @@ -22,8 +22,10 @@ import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.ConsumerBuilder; import com.rabbitmq.client.amqp.Resource; +import java.math.BigDecimal; import java.time.Instant; import java.util.*; +import org.apache.qpid.protonj2.types.*; class AmqpConsumerBuilder implements ConsumerBuilder { @@ -34,7 +36,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder { private Consumer.MessageHandler messageHandler; private int initialCredits = 100; private final List listeners = new ArrayList<>(); - private final Map filters = new LinkedHashMap<>(); + private final Map filters = new LinkedHashMap<>(); private final Map properties = new LinkedHashMap<>(); private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters); private SubscriptionListener subscriptionListener = NO_OP_SUBSCRIPTION_LISTENER; @@ -116,7 +118,7 @@ List listeners() { return listeners; } - Map filters() { + Map filters() { return this.filters; } @@ -133,32 +135,31 @@ public Consumer build() { private static class DefaultStreamOptions implements StreamOptions { - private final Map filters; + private final Map filters; private final ConsumerBuilder builder; + private final StreamFilterOptions filterOptions; - private DefaultStreamOptions(ConsumerBuilder builder, Map filters) { + private DefaultStreamOptions(ConsumerBuilder builder, Map filters) { this.builder = builder; this.filters = filters; + this.filterOptions = new DefaultStreamFilterOptions(this, filters); } @Override public StreamOptions offset(long offset) { - this.filters.put("rabbitmq:stream-offset-spec", offset); - return this; + return this.filter("rabbitmq:stream-offset-spec", offset); } @Override public StreamOptions offset(Instant timestamp) { notNull(timestamp, "Timestamp offset cannot be null"); - this.offsetSpecification(Date.from(timestamp)); - return this; + return this.offsetSpecification(Date.from(timestamp)); } @Override public StreamOptions offset(StreamOffsetSpecification specification) { notNull(specification, "Offset specification cannot be null"); - this.offsetSpecification(specification.name().toLowerCase(Locale.ENGLISH)); - return this; + return this.offsetSpecification(specification.name().toLowerCase(Locale.ENGLISH)); } @Override @@ -171,8 +172,7 @@ public StreamOptions offset(String interval) { + ". " + "Valid examples are: 1Y, 7D, 10m. See https://www.rabbitmq.com/docs/streams#retention."); } - this.offsetSpecification(interval); - return this; + return this.offsetSpecification(interval); } @Override @@ -180,14 +180,17 @@ public StreamOptions filterValues(String... values) { if (values == null || values.length == 0) { throw new IllegalArgumentException("At least one stream filter value must specified"); } - this.filters.put("rabbitmq:stream-filter", Arrays.asList(values)); - return this; + return this.filter("rabbitmq:stream-filter", Arrays.asList(values)); } @Override public StreamOptions filterMatchUnfiltered(boolean matchUnfiltered) { - this.filters.put("rabbitmq:stream-match-unfiltered", matchUnfiltered); - return this; + return this.filter("rabbitmq:stream-match-unfiltered", matchUnfiltered); + } + + @Override + public StreamFilterOptions filter() { + return this.filterOptions; } @Override @@ -195,12 +198,228 @@ public ConsumerBuilder builder() { return this.builder; } - private void offsetSpecification(Object value) { - this.filters.put("rabbitmq:stream-offset-spec", value); + private StreamOptions offsetSpecification(Object value) { + return filter("rabbitmq:stream-offset-spec", value); } + + private StreamOptions filter(String key, Object value) { + AmqpConsumerBuilder.filter(this.filters, key, value); + return this; + } + } + + private static void filter(Map filters, String filterName, Object value) { + filters.put(filterName, filterValue(filterName, value)); + } + + private static DescribedType filterValue(String filterName, Object value) { + return new UnknownDescribedType(Symbol.getSymbol(filterName), value); } - static StreamOptions streamOptions(Map filters) { + static StreamOptions streamOptions(Map filters) { return new DefaultStreamOptions(null, filters); } + + private static class DefaultStreamFilterOptions implements StreamFilterOptions { + + private final StreamOptions streamOptions; + private final Map filters; + + private DefaultStreamFilterOptions( + StreamOptions streamOptions, Map filters) { + this.streamOptions = streamOptions; + this.filters = filters; + } + + @Override + public StreamFilterOptions messageId(Object id) { + return propertyFilter("message-id", id); + } + + @Override + public StreamFilterOptions messageId(String id) { + return propertyFilter("message-id", id); + } + + @Override + public StreamFilterOptions messageId(long id) { + return propertyFilter("message-id", new UnsignedLong(id)); + } + + @Override + public StreamFilterOptions messageId(byte[] id) { + return propertyFilter("message-id", new Binary(id)); + } + + @Override + public StreamFilterOptions messageId(UUID id) { + return propertyFilter("message-id", id); + } + + @Override + public StreamFilterOptions correlationId(Object correlationId) { + return propertyFilter("correlation-id", correlationId); + } + + @Override + public StreamFilterOptions correlationId(String correlationId) { + return propertyFilter("correlation-id", correlationId); + } + + @Override + public StreamFilterOptions correlationId(long correlationId) { + return propertyFilter("correlation-id", new UnsignedLong(correlationId)); + } + + @Override + public StreamFilterOptions correlationId(byte[] correlationId) { + return propertyFilter("correlation-id", new Binary(correlationId)); + } + + @Override + public StreamFilterOptions correlationId(UUID correlationId) { + return propertyFilter("correlation-id", correlationId); + } + + @Override + public StreamFilterOptions userId(byte[] userId) { + return propertyFilter("user-id", new Binary(userId)); + } + + @Override + public StreamFilterOptions to(String to) { + return propertyFilter("to", to); + } + + @Override + public StreamFilterOptions subject(String subject) { + return propertyFilter("subject", subject); + } + + @Override + public StreamFilterOptions property(String key, boolean value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions property(String key, byte value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions property(String key, short value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions property(String key, int value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions property(String key, long value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions propertyUnsigned(String key, byte value) { + return this.applicationPropertyFilter(key, UnsignedByte.valueOf(value)); + } + + @Override + public StreamFilterOptions propertyUnsigned(String key, short value) { + return this.applicationPropertyFilter(key, UnsignedShort.valueOf(value)); + } + + @Override + public StreamFilterOptions propertyUnsigned(String key, int value) { + return this.applicationPropertyFilter(key, UnsignedInteger.valueOf(value)); + } + + @Override + public StreamFilterOptions propertyUnsigned(String key, long value) { + return this.applicationPropertyFilter(key, UnsignedLong.valueOf(value)); + } + + @Override + public StreamFilterOptions property(String key, float value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions property(String key, double value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions propertyDecimal32(String key, BigDecimal value) { + return this.applicationPropertyFilter(key, new Decimal32(value)); + } + + @Override + public StreamFilterOptions propertyDecimal64(String key, BigDecimal value) { + return this.applicationPropertyFilter(key, new Decimal64(value)); + } + + @Override + public StreamFilterOptions propertyDecimal128(String key, BigDecimal value) { + return this.applicationPropertyFilter(key, new Decimal128(value)); + } + + @Override + public StreamFilterOptions property(String key, char value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions propertyTimestamp(String key, long value) { + return this.applicationPropertyFilter(key, new Date(value)); + } + + @Override + public StreamFilterOptions property(String key, UUID value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions property(String key, byte[] value) { + return this.applicationPropertyFilter(key, new Binary(value)); + } + + @Override + public StreamFilterOptions property(String key, String value) { + return this.applicationPropertyFilter(key, value); + } + + @Override + public StreamFilterOptions propertySymbol(String key, String value) { + return this.applicationPropertyFilter(key, Symbol.valueOf(value)); + } + + @Override + public StreamOptions stream() { + return this.streamOptions; + } + + private StreamFilterOptions propertyFilter(String propertyKey, Object propertyValue) { + Map filter = filter("amqp:properties-filter"); + filter.put(Symbol.valueOf(propertyKey), propertyValue); + return this; + } + + private StreamFilterOptions applicationPropertyFilter( + String propertyKey, Object propertyValue) { + Map filter = filter("amqp:application-properties-filter"); + filter.put(propertyKey, propertyValue); + return this; + } + + @SuppressWarnings("unchecked") + private Map filter(String filterName) { + DescribedType type = + this.filters.computeIfAbsent( + filterName, fName -> filterValue(fName, new LinkedHashMap<>())); + return (Map) type.getDescribed(); + } + } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java index e0d62c996..18931888a 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java @@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.types.*; -class AmqpMessage implements Message { +final class AmqpMessage implements Message { private static final byte[] EMPTY_BODY = new byte[0]; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java new file mode 100644 index 000000000..af9019515 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java @@ -0,0 +1,27 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import java.lang.annotation.*; +import org.junit.jupiter.api.extension.ExtendWith; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ExtendWith(AmqpTestInfrastructureExtension.class) +public @interface AmqpTestInfrastructure {} diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java index ad57a0601..72d31e824 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java @@ -22,6 +22,7 @@ import com.rabbitmq.client.amqp.BackOffDelayPolicy; import com.rabbitmq.client.amqp.Connection; import com.rabbitmq.client.amqp.Environment; +import java.lang.annotation.*; import java.lang.reflect.Field; import org.junit.jupiter.api.extension.*; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java index 406a52f04..54ca398bd 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.apache.qpid.protonj2.types.UnsignedLong; import org.assertj.core.api.AbstractObjectAssert; final class Assertions { @@ -226,6 +227,10 @@ private MessageAssert(Message message) { super(message, MessageAssert.class); } + MessageAssert hasId(long id) { + return this.hasId(new UnsignedLong(id)); + } + MessageAssert hasId(Object id) { isNotNull(); if (!actual.messageId().equals(id)) { @@ -234,6 +239,62 @@ MessageAssert hasId(Object id) { return this; } + MessageAssert hasCorrelationId(long correlationId) { + return this.hasCorrelationId(new UnsignedLong(correlationId)); + } + + MessageAssert hasCorrelationId(Object id) { + isNotNull(); + if (!actual.correlationId().equals(id)) { + fail("Correlation ID should be '%s' but is '%s'", id, actual.correlationId()); + } + return this; + } + + MessageAssert hasUserId(byte[] userId) { + isNotNull(); + org.assertj.core.api.Assertions.assertThat(actual.userId()).isEqualTo(userId); + return this; + } + + MessageAssert hasTo(String to) { + isNotNull(); + if (!actual.to().equals(to)) { + fail("To field should be '%s' but is '%s'", to, actual.to()); + } + return this; + } + + MessageAssert hasSubject(String subject) { + isNotNull(); + if (!actual.subject().equals(subject)) { + fail("Subject should be '%s' but is '%s'", subject, actual.subject()); + } + return this; + } + + MessageAssert hasProperty(String key) { + isNotNull(); + if (!actual.hasProperty(key)) { + fail("Message should have property '%s' but does not", key); + } + return this; + } + + MessageAssert hasProperty(String key, Object value) { + if (key == null || value == null) { + throw new IllegalArgumentException(); + } + isNotNull(); + hasProperty(key); + if (!value.equals(this.actual.property(key))) { + fail( + "Message should have property '%s = %s' but has '%s = %s'", + key, value, key, this.actual.property(key)); + } + return this; + } + MessageAssert hasAnnotation(String key) { isNotNull(); if (!actual.hasAnnotation(key)) { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index fd041b268..c8609b83d 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -18,217 +18,201 @@ package com.rabbitmq.client.amqp.impl; import static com.rabbitmq.client.amqp.ConsumerBuilder.StreamOffsetSpecification.*; +import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; +import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; import static com.rabbitmq.client.amqp.impl.TestUtils.sync; +import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable; +import static java.nio.charset.StandardCharsets.*; import static java.util.stream.IntStream.range; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.rabbitmq.client.amqp.*; import com.rabbitmq.client.amqp.impl.TestUtils.Sync; import java.time.Duration; import java.time.Instant; -import java.util.List; -import java.util.SortedSet; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; +import java.util.function.UnaryOperator; +import net.jqwik.api.Arbitraries; +import net.jqwik.api.arbitraries.ArrayArbitrary; +import org.apache.qpid.protonj2.types.Binary; +import org.apache.qpid.protonj2.types.Symbol; +import org.junit.jupiter.api.*; -@ExtendWith(AmqpTestInfrastructureExtension.class) +@AmqpTestInfrastructure public class SourceFiltersTest { Connection connection; String name; + ArrayArbitrary binaryArbitrary; @BeforeEach void init(TestInfo info) { this.name = TestUtils.name(info); + connection.management().queue(this.name).type(STREAM).declare(); + binaryArbitrary = Arbitraries.bytes().array(byte[].class).ofMinSize(10).ofMaxSize(20); + } + + @AfterEach + void tearDown() { + connection.management().queueDeletion().delete(this.name); } @Test void streamConsumerOptionsOffsetLong() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageCount = 100; - publish(messageCount); - - SortedSet offsets = new ConcurrentSkipListSet<>(); - Sync consumeSync = sync(messageCount); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset(0L) - .builder() - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - offsets.add((Long) msg.annotation("x-stream-offset")); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - SortedSet offsetTail = offsets.tailSet(offsets.last() / 2); - Assertions.assertThat(offsetTail.first()).isPositive(); - consumer.close(); - consumeSync.reset(offsetTail.size()); - consumer = - connection.consumerBuilder().stream() - .offset(offsetTail.first()) - .builder() - .queue(name) - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageCount = 100; + publish(messageCount); + + SortedSet offsets = new ConcurrentSkipListSet<>(); + Sync consumeSync = sync(messageCount); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset(0L) + .builder() + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + offsets.add((Long) msg.annotation("x-stream-offset")); + consumeSync.down(); + }) + .build(); + assertThat(consumeSync).completes(); + SortedSet offsetTail = offsets.tailSet(offsets.last() / 2); + assertThat(offsetTail.first()).isPositive(); + consumer.close(); + consumeSync.reset(offsetTail.size()); + consumer = + connection.consumerBuilder().stream() + .offset(offsetTail.first()) + .builder() + .queue(name) + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + consumeSync.down(); + }) + .build(); + assertThat(consumeSync).completes(); + consumer.close(); } @Test void streamConsumerOptionsOffsetFirst() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageCount = 100; - publish(messageCount); - - Sync consumeSync = sync(messageCount); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset(FIRST) - .builder() - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageCount = 100; + publish(messageCount); + + Sync consumeSync = sync(messageCount); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset(FIRST) + .builder() + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + consumeSync.down(); + }) + .build(); + assertThat(consumeSync).completes(); + consumer.close(); } @Test void streamConsumerOptionsOffsetLast() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageCount = 100; - publish(messageCount); - - Sync consumeSync = sync(1); - AtomicLong firstOffset = new AtomicLong(-1); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset(LAST) - .builder() - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset")); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - Assertions.assertThat(firstOffset).hasPositiveValue(); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageCount = 100; + publish(messageCount); + + Sync consumeSync = sync(1); + AtomicLong firstOffset = new AtomicLong(-1); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset(LAST) + .builder() + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset")); + consumeSync.down(); + }) + .build(); + assertThat(consumeSync).completes(); + assertThat(firstOffset).hasPositiveValue(); + consumer.close(); } @Test void streamConsumerOptionsOffsetNext() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageCount = 100; - publish(messageCount); - - Sync consumeSync = sync(messageCount); - Sync openSync = sync(1); - AtomicLong firstOffset = new AtomicLong(-1); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset(NEXT) - .builder() - .listeners( - context -> { - if (context.currentState() == Resource.State.OPEN) { - openSync.down(); - } - }) - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset")); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(openSync).completes(); - publish(messageCount); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - Assertions.assertThat(firstOffset).hasPositiveValue(); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageCount = 100; + publish(messageCount); + + Sync consumeSync = sync(messageCount); + Sync openSync = sync(1); + AtomicLong firstOffset = new AtomicLong(-1); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset(NEXT) + .builder() + .listeners( + context -> { + if (context.currentState() == Resource.State.OPEN) { + openSync.down(); + } + }) + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset")); + consumeSync.down(); + }) + .build(); + assertThat(openSync).completes(); + publish(messageCount); + assertThat(consumeSync).completes(); + assertThat(firstOffset).hasPositiveValue(); + consumer.close(); } @Test void streamConsumerOptionsOffsetTimestamp() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageCount = 100; - publish(messageCount); - - Sync consumeSync = sync(messageCount); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset(Instant.now().minus(Duration.ofMinutes(10))) - .builder() - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageCount = 100; + publish(messageCount); + + Sync consumeSync = sync(messageCount); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset(Instant.now().minus(Duration.ofMinutes(10))) + .builder() + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + consumeSync.down(); + }) + .build(); + assertThat(consumeSync).completes(); + consumer.close(); } @Test void streamConsumerOptionsOffsetInterval() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageCount = 100; - publish(messageCount); - - Sync consumeSync = sync(messageCount); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset("10m") - .builder() - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - consumeSync.down(); - }) - .build(); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes(); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageCount = 100; + publish(messageCount); + + Sync consumeSync = sync(messageCount); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset("10m") + .builder() + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + consumeSync.down(); + }) + .build(); + assertThat(consumeSync).completes(); + consumer.close(); } @Test @@ -239,72 +223,189 @@ void streamConsumerOptionsOffsetIntervalWithInvalidSyntaxShouldThrow() { @Test void streamFiltering() { - connection.management().queue(name).type(Management.QueueType.STREAM).declare(); - try { - int messageWaveCount = 100; - List waves = List.of("apple", "orange", "", "banana"); - waves.forEach(v -> publish(messageWaveCount, v)); - int waveCount = waves.size(); - - AtomicInteger receivedCount = new AtomicInteger(0); - Consumer consumer = - connection.consumerBuilder().queue(name).stream() - .offset(FIRST) - .filterValues("banana") - .filterMatchUnfiltered(false) - .builder() - .messageHandler( - (ctx, msg) -> { - receivedCount.incrementAndGet(); - ctx.accept(); - }) - .build(); - TestUtils.waitUntilStable(receivedCount::get); - Assertions.assertThat(receivedCount) - .hasValueGreaterThanOrEqualTo(messageWaveCount) - .hasValueLessThan(waveCount * messageWaveCount); - consumer.close(); - - receivedCount.set(0); - consumer = - connection.consumerBuilder().queue(name).stream() - .offset(FIRST) - .filterValues("banana") - .filterMatchUnfiltered(true) - .builder() - .messageHandler( - (ctx, msg) -> { - receivedCount.incrementAndGet(); - ctx.accept(); - }) - .build(); - TestUtils.waitUntilStable(receivedCount::get); - Assertions.assertThat(receivedCount) - .hasValueGreaterThanOrEqualTo(2 * messageWaveCount) - .hasValueLessThan(waveCount * messageWaveCount); - consumer.close(); - } finally { - connection.management().queueDeletion().delete(name); - } + int messageWaveCount = 100; + List waves = List.of("apple", "orange", "", "banana"); + waves.forEach(v -> publish(messageWaveCount, v)); + int waveCount = waves.size(); + + AtomicInteger receivedCount = new AtomicInteger(0); + Consumer consumer = + connection.consumerBuilder().queue(name).stream() + .offset(FIRST) + .filterValues("banana") + .filterMatchUnfiltered(false) + .builder() + .messageHandler( + (ctx, msg) -> { + receivedCount.incrementAndGet(); + ctx.accept(); + }) + .build(); + waitUntilStable(receivedCount::get); + assertThat(receivedCount) + .hasValueGreaterThanOrEqualTo(messageWaveCount) + .hasValueLessThan(waveCount * messageWaveCount); + consumer.close(); + + receivedCount.set(0); + consumer = + connection.consumerBuilder().queue(name).stream() + .offset(FIRST) + .filterValues("banana") + .filterMatchUnfiltered(true) + .builder() + .messageHandler( + (ctx, msg) -> { + receivedCount.incrementAndGet(); + ctx.accept(); + }) + .build(); + waitUntilStable(receivedCount::get); + assertThat(receivedCount) + .hasValueGreaterThanOrEqualTo(2 * messageWaveCount) + .hasValueLessThan(waveCount * messageWaveCount); + consumer.close(); + } + + @Test + @Disabled + void filterExpressionApplicationProperties() { + int messageCount = 10; + UUID uuid = UUID.randomUUID(); + long now = System.currentTimeMillis(); + byte[] binary = binaryArbitrary.sample(); + publish(messageCount, msg -> msg.property("foo", true)); + publish(messageCount, msg -> msg.property("foo", 42)); + publish(messageCount, msg -> msg.property("foo", 42.1)); + publish(messageCount, msg -> msg.property("foo", now)); + publish(messageCount, msg -> msg.property("foo", uuid)); + publish(messageCount, msg -> msg.property("foo", binary)); + publish(messageCount, msg -> msg.property("foo", "bar")); + publish(messageCount, msg -> msg.property("foo", "baz")); + publish(messageCount, msg -> msg.property("foo", "bar")); + publish(messageCount, msg -> msg.propertySymbol("foo", "symbol")); + publish(messageCount, msg -> msg.property("foo", "bar").property("k1", 42)); + + Collection msgs = consume(messageCount, options -> options.property("foo", true)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", true)); + + msgs = consume(messageCount, options -> options.property("foo", 42)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", 42)); + + msgs = consume(messageCount, options -> options.property("foo", 42.1)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", 42.1)); + + msgs = consume(messageCount, options -> options.propertyTimestamp("foo", now)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", now)); + + msgs = consume(messageCount, options -> options.property("foo", uuid)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", uuid)); + + msgs = consume(messageCount, options -> options.property("foo", binary)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", new Binary(binary))); + + msgs = consume(messageCount, options -> options.property("foo", "baz")); + msgs.forEach(m -> assertThat(m).hasProperty("foo", "baz")); + + msgs = consume(messageCount * 3, options -> options.property("foo", "bar")); + msgs.forEach(m -> assertThat(m).hasProperty("foo", "bar")); + + msgs = consume(messageCount * 4, options -> options.property("foo", "$p:b")); + assertThat(msgs).allMatch(m -> m.property("foo").toString().startsWith("b")); + + msgs = consume(messageCount, options -> options.propertySymbol("foo", "symbol")); + msgs.forEach(m -> assertThat(m).hasProperty("foo", Symbol.valueOf("symbol"))); + + msgs = consume(messageCount, options -> options.property("foo", "bar").property("k1", 42)); + msgs.forEach(m -> assertThat(m).hasProperty("foo", "bar").hasProperty("k1", 42)); + } + + @Test + @Disabled + void filterExpressionProperties() { + int messageCount = 10; + byte[] userId = "guest".getBytes(UTF_8); + byte[] binary1 = binaryArbitrary.sample(); + byte[] binary2 = binaryArbitrary.sample(); + UUID uuid = UUID.randomUUID(); + publish(messageCount, msg -> msg.messageId(42)); + publish(messageCount, msg -> msg.messageId(42 * 2)); + publish(messageCount, msg -> msg.messageId(uuid)); + publish(messageCount, msg -> msg.correlationId(42)); + publish(messageCount, msg -> msg.correlationId(42 * 2)); + publish(messageCount, msg -> msg.correlationId(uuid)); + publish(messageCount, msg -> msg.userId(userId)); + publish(messageCount, msg -> msg.to("foo bar")); + publish(messageCount, msg -> msg.to("foo baz")); + publish(messageCount, msg -> msg.subject("foo bar")); + publish(messageCount, msg -> msg.subject("foo baz")); + + Collection msgs = consume(messageCount, options -> options.messageId(42)); + msgs.forEach(m -> assertThat(m).hasId(42)); + + msgs = consume(messageCount, options -> options.messageId(uuid)); + msgs.forEach(m -> assertThat(m).hasId(uuid)); + + msgs = consume(messageCount, options -> options.correlationId(42)); + msgs.forEach(m -> assertThat(m).hasCorrelationId(42)); + + msgs = consume(messageCount, options -> options.correlationId(uuid)); + msgs.forEach(m -> assertThat(m).hasCorrelationId(uuid)); + + msgs = consume(messageCount, options -> options.userId(userId)); + msgs.forEach(m -> assertThat(m).hasUserId(userId)); + + msgs = consume(messageCount, options -> options.to("foo bar")); + msgs.forEach(m -> assertThat(m).hasTo("foo bar")); + + msgs = consume(messageCount, options -> options.subject("foo bar")); + msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); } void publish(int messageCount) { - this.publish(messageCount, null); + this.publish(messageCount, UnaryOperator.identity()); } void publish(int messageCount, String filterValue) { + publish(messageCount, msg -> msg.annotation("x-stream-filter-value", filterValue)); + } + + void publish(int messageCount, UnaryOperator messageLogic) { try (Publisher publisher = connection.publisherBuilder().queue(name).build()) { Sync publishSync = sync(messageCount); + Publisher.Callback callback = ctx -> publishSync.down(); range(0, messageCount) - .forEach( - ignored -> { - Message message = publisher.message(); - if (filterValue != null && !filterValue.isBlank()) { - message.annotation("x-stream-filter-value", filterValue); - } - publisher.publish(message, ctx -> publishSync.down()); - }); - com.rabbitmq.client.amqp.impl.Assertions.assertThat(publishSync).completes(); + .forEach(ignored -> publisher.publish(messageLogic.apply(publisher.message()), callback)); + assertThat(publishSync).completes(); } } + + Collection consume( + int expectedMessageCount, + java.util.function.Consumer filterOptions) { + Set messages = ConcurrentHashMap.newKeySet(expectedMessageCount); + Sync consumedSync = sync(expectedMessageCount); + AtomicInteger receivedMessageCount = new AtomicInteger(); + ConsumerBuilder builder = + connection + .consumerBuilder() + .queue(this.name) + .messageHandler( + (ctx, msg) -> { + messages.add(msg); + receivedMessageCount.incrementAndGet(); + consumedSync.down(); + ctx.accept(); + }); + + filterOptions.accept(builder.stream().offset(FIRST).filter()); + + try (Consumer ignored = builder.build()) { + assertThat(consumedSync).completes(); + waitUntilStable(receivedMessageCount::get, Duration.ofMillis(50)); + assertThat(receivedMessageCount).hasValue(expectedMessageCount); + } + + return messages; + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java index 7f29caf86..40552d18a 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java @@ -58,8 +58,11 @@ static void submitTask(Runnable task) { } static T waitUntilStable(Supplier call) { + return waitUntilStable(call, Duration.ofMillis(200)); + } + + static T waitUntilStable(Supplier call, Duration waitTime) { Duration timeout = Duration.ofSeconds(10); - Duration waitTime = Duration.ofMillis(200); Duration waitedTime = Duration.ZERO; T newValue = null; while (waitedTime.compareTo(timeout) <= 0) { From fbb3d4e0932c93da52d766e9180692ad68a8c8e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 7 Oct 2024 11:59:16 +0200 Subject: [PATCH 2/4] Support filter expressions References rabbitmq/rabbitmq-server#12415 --- .github/workflows/test-pr.yml | 2 + .../rabbitmq/client/amqp/ConsumerBuilder.java | 16 ++ .../client/amqp/impl/AmqpConsumerBuilder.java | 40 +++++ .../client/amqp/impl/AmqpMessage.java | 2 +- .../rabbitmq/client/amqp/impl/Assertions.java | 72 ++++++--- .../client/amqp/impl/SourceFiltersTest.java | 137 +++++++++++++++--- 6 files changed, 232 insertions(+), 37 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index ab4aa126f..08e4c06c1 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,6 +24,8 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh + env: + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-filtex' - name: Start toxiproxy run: ci/start-toxiproxy.sh - name: Display Java version diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java index 1eb1a1a1f..d26257267 100644 --- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java @@ -209,6 +209,22 @@ interface FilterOptions { T subject(String subject); + T replyTo(String replyTo); + + T contentType(String contentType); + + T contentEncoding(String contentEncoding); + + T absoluteExpiryTime(long absoluteExpiryTime); + + T creationTime(long creationTime); + + T groupId(String groupId); + + T groupSequence(int groupSequence); + + T replyToGroupId(String groupId); + T property(String key, boolean value); T property(String key, byte value); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java index dac13edbc..9875dca5e 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java @@ -296,6 +296,46 @@ public StreamFilterOptions subject(String subject) { return propertyFilter("subject", subject); } + @Override + public StreamFilterOptions replyTo(String replyTo) { + return propertyFilter("reply-to", replyTo); + } + + @Override + public StreamFilterOptions contentType(String contentType) { + return propertyFilter("content-type", Symbol.valueOf(contentType)); + } + + @Override + public StreamFilterOptions contentEncoding(String contentEncoding) { + return propertyFilter("content-encoding", Symbol.valueOf(contentEncoding)); + } + + @Override + public StreamFilterOptions absoluteExpiryTime(long absoluteExpiryTime) { + return propertyFilter("absolute-expiry-time", new Date(absoluteExpiryTime)); + } + + @Override + public StreamFilterOptions creationTime(long creationTime) { + return propertyFilter("creation-time", new Date(creationTime)); + } + + @Override + public StreamFilterOptions groupId(String groupId) { + return propertyFilter("group-id", groupId); + } + + @Override + public StreamFilterOptions groupSequence(int groupSequence) { + return propertyFilter("group-sequence", UnsignedInteger.valueOf(groupSequence)); + } + + @Override + public StreamFilterOptions replyToGroupId(String groupId) { + return propertyFilter("reply-to-group-id", groupId); + } + @Override public StreamFilterOptions property(String key, boolean value) { return this.applicationPropertyFilter(key, value); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java index 18931888a..92f37cf22 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java @@ -241,7 +241,7 @@ public Message groupSequence(int groupSequence) { @Override public Message replyToGroupId(String groupId) { - callOnDelegate(m -> replyToGroupId(groupId)); + callOnDelegate(m -> m.replyToGroupId(groupId)); return this; } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java index 54ca398bd..f5ff2c463 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java @@ -23,6 +23,7 @@ import com.rabbitmq.client.amqp.Message; import java.time.Duration; import java.util.List; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -232,11 +233,7 @@ MessageAssert hasId(long id) { } MessageAssert hasId(Object id) { - isNotNull(); - if (!actual.messageId().equals(id)) { - fail("Message ID should be '%s' but is '%s'", id, actual.messageId()); - } - return this; + return hasField("id", actual.messageId(), id); } MessageAssert hasCorrelationId(long correlationId) { @@ -244,11 +241,7 @@ MessageAssert hasCorrelationId(long correlationId) { } MessageAssert hasCorrelationId(Object id) { - isNotNull(); - if (!actual.correlationId().equals(id)) { - fail("Correlation ID should be '%s' but is '%s'", id, actual.correlationId()); - } - return this; + return hasField("correlation-id", actual.correlationId(), id); } MessageAssert hasUserId(byte[] userId) { @@ -258,18 +251,55 @@ MessageAssert hasUserId(byte[] userId) { } MessageAssert hasTo(String to) { + return hasField("to", actual.to(), to); + } + + MessageAssert hasSubject(String subject) { + return hasField("subject", actual.subject(), subject); + } + + MessageAssert hasReplyTo(String replyTo) { + return hasField("reply-to", actual.replyTo(), replyTo); + } + + MessageAssert hasContentType(String contentType) { + return hasField("content-type", actual.contentType(), contentType); + } + + MessageAssert hasContentEncoding(String contentEncoding) { + return hasField("content-encoding", actual.contentEncoding(), contentEncoding); + } + + MessageAssert hasAbsoluteExpiryTime(long absoluteExpiryTime) { isNotNull(); - if (!actual.to().equals(to)) { - fail("To field should be '%s' but is '%s'", to, actual.to()); - } + org.assertj.core.api.Assertions.assertThat(actual.absoluteExpiryTime()) + .isEqualTo(absoluteExpiryTime); return this; } - MessageAssert hasSubject(String subject) { + MessageAssert hasCreationTime(long creationTime) { isNotNull(); - if (!actual.subject().equals(subject)) { - fail("Subject should be '%s' but is '%s'", subject, actual.subject()); - } + org.assertj.core.api.Assertions.assertThat(actual.creationTime()).isEqualTo(creationTime); + return this; + } + + MessageAssert hasGroupId(String groupId) { + return hasField("group-id", actual.groupId(), groupId); + } + + MessageAssert hasGroupSequence(long groupSequence) { + isNotNull(); + org.assertj.core.api.Assertions.assertThat(actual.groupSequence()).isEqualTo(groupSequence); + return this; + } + + MessageAssert hasReplyToGroupId(String groupId) { + return hasField("reply-to-group-id", actual.replyToGroupId(), groupId); + } + + MessageAssert hasBody(byte[] body) { + isNotNull(); + org.assertj.core.api.Assertions.assertThat(actual.body()).isEqualTo(body); return this; } @@ -324,6 +354,14 @@ MessageAssert doesNotHaveAnnotation(String key) { } return this; } + + private MessageAssert hasField(String fieldLabel, Object value, Object expected) { + isNotNull(); + if (!Objects.equals(value, expected)) { + fail("Field '%s' should be '%s' but is '%s'", fieldLabel, expected, value); + } + return this; + } } static class ConnectionAssert extends AbstractObjectAssert { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index c8609b83d..a0cfc9019 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -32,13 +32,15 @@ import java.time.Duration; import java.time.Instant; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.UnaryOperator; import net.jqwik.api.Arbitraries; import net.jqwik.api.arbitraries.ArrayArbitrary; +import net.jqwik.api.arbitraries.IntegerArbitrary; +import net.jqwik.api.arbitraries.StringArbitrary; import org.apache.qpid.protonj2.types.Binary; import org.apache.qpid.protonj2.types.Symbol; import org.junit.jupiter.api.*; @@ -49,12 +51,16 @@ public class SourceFiltersTest { Connection connection; String name; ArrayArbitrary binaryArbitrary; + StringArbitrary stringArbitrary; + IntegerArbitrary integerArbitrary; @BeforeEach void init(TestInfo info) { this.name = TestUtils.name(info); connection.management().queue(this.name).type(STREAM).declare(); binaryArbitrary = Arbitraries.bytes().array(byte[].class).ofMinSize(10).ofMaxSize(20); + stringArbitrary = Arbitraries.strings().ofMinLength(10).ofMaxLength(20); + integerArbitrary = Arbitraries.integers(); } @AfterEach @@ -268,7 +274,6 @@ void streamFiltering() { } @Test - @Disabled void filterExpressionApplicationProperties() { int messageCount = 10; UUID uuid = UUID.randomUUID(); @@ -321,24 +326,39 @@ void filterExpressionApplicationProperties() { } @Test - @Disabled void filterExpressionProperties() { int messageCount = 10; byte[] userId = "guest".getBytes(UTF_8); - byte[] binary1 = binaryArbitrary.sample(); - byte[] binary2 = binaryArbitrary.sample(); + long now = System.currentTimeMillis(); + long later = now + Duration.ofMinutes(10).toMillis(); UUID uuid = UUID.randomUUID(); publish(messageCount, msg -> msg.messageId(42)); publish(messageCount, msg -> msg.messageId(42 * 2)); publish(messageCount, msg -> msg.messageId(uuid)); - publish(messageCount, msg -> msg.correlationId(42)); - publish(messageCount, msg -> msg.correlationId(42 * 2)); + publish(messageCount, msg -> msg.correlationId(43)); + publish(messageCount, msg -> msg.correlationId(43 * 2)); publish(messageCount, msg -> msg.correlationId(uuid)); publish(messageCount, msg -> msg.userId(userId)); - publish(messageCount, msg -> msg.to("foo bar")); - publish(messageCount, msg -> msg.to("foo baz")); - publish(messageCount, msg -> msg.subject("foo bar")); - publish(messageCount, msg -> msg.subject("foo baz")); + publish(messageCount, msg -> msg.to("to foo bar")); + publish(messageCount, msg -> msg.to("to foo baz")); + publish(messageCount, msg -> msg.subject("subject foo bar")); + publish(messageCount, msg -> msg.subject("subject foo baz")); + publish(messageCount, msg -> msg.replyTo("reply-to foo bar")); + publish(messageCount, msg -> msg.replyTo("reply-to foo baz")); + publish(messageCount, msg -> msg.contentType("text/plain")); + publish(messageCount, msg -> msg.contentType("text/html")); + publish(messageCount, msg -> msg.contentEncoding("gzip")); + publish(messageCount, msg -> msg.contentEncoding("zstd")); + publish(messageCount, msg -> msg.absoluteExpiryTime(now)); + publish(messageCount, msg -> msg.absoluteExpiryTime(later)); + publish(messageCount, msg -> msg.creationTime(now)); + publish(messageCount, msg -> msg.creationTime(later)); + publish(messageCount, msg -> msg.groupId("group-id foo bar")); + publish(messageCount, msg -> msg.groupId("group-id foo baz")); + publish(messageCount, msg -> msg.groupSequence(44)); + publish(messageCount, msg -> msg.groupSequence(44 * 2)); + publish(messageCount, msg -> msg.replyToGroupId("reply-to-group-id foo bar")); + publish(messageCount, msg -> msg.replyToGroupId("reply-to-group-id foo baz")); Collection msgs = consume(messageCount, options -> options.messageId(42)); msgs.forEach(m -> assertThat(m).hasId(42)); @@ -346,8 +366,8 @@ void filterExpressionProperties() { msgs = consume(messageCount, options -> options.messageId(uuid)); msgs.forEach(m -> assertThat(m).hasId(uuid)); - msgs = consume(messageCount, options -> options.correlationId(42)); - msgs.forEach(m -> assertThat(m).hasCorrelationId(42)); + msgs = consume(messageCount, options -> options.correlationId(43)); + msgs.forEach(m -> assertThat(m).hasCorrelationId(43)); msgs = consume(messageCount, options -> options.correlationId(uuid)); msgs.forEach(m -> assertThat(m).hasCorrelationId(uuid)); @@ -355,10 +375,89 @@ void filterExpressionProperties() { msgs = consume(messageCount, options -> options.userId(userId)); msgs.forEach(m -> assertThat(m).hasUserId(userId)); - msgs = consume(messageCount, options -> options.to("foo bar")); - msgs.forEach(m -> assertThat(m).hasTo("foo bar")); + msgs = consume(messageCount, options -> options.to("to foo bar")); + msgs.forEach(m -> assertThat(m).hasTo("to foo bar")); + + msgs = consume(messageCount, options -> options.subject("subject foo bar")); + msgs.forEach(m -> assertThat(m).hasSubject("subject foo bar")); + + msgs = consume(messageCount, options -> options.replyTo("reply-to foo bar")); + msgs.forEach(m -> assertThat(m).hasReplyTo("reply-to foo bar")); + + msgs = consume(messageCount, options -> options.contentType("text/html")); + msgs.forEach(m -> assertThat(m).hasContentType("text/html")); + + msgs = consume(messageCount, options -> options.contentEncoding("zstd")); + msgs.forEach(m -> assertThat(m).hasContentEncoding("zstd")); + + msgs = consume(messageCount, options -> options.absoluteExpiryTime(later)); + msgs.forEach(m -> assertThat(m).hasAbsoluteExpiryTime(later)); + + msgs = consume(messageCount, options -> options.creationTime(later)); + msgs.forEach(m -> assertThat(m).hasCreationTime(later)); + + msgs = consume(messageCount, options -> options.groupId("group-id foo baz")); + msgs.forEach(m -> assertThat(m).hasGroupId("group-id foo baz")); + + msgs = consume(messageCount, options -> options.groupSequence(44)); + msgs.forEach(m -> assertThat(m).hasGroupSequence(44)); + + msgs = consume(messageCount, options -> options.replyToGroupId("reply-to-group-id foo baz")); + msgs.forEach(m -> assertThat(m).hasReplyToGroupId("reply-to-group-id foo baz")); + } + + @Test + void filterExpressionsPropertiesAndApplicationProperties() { + int messageCount = 10; + String subject = stringArbitrary.sample(); + String appKey = stringArbitrary.sample(); + int appValue = integerArbitrary.sample(); + byte[] body1 = binaryArbitrary.sample(); + byte[] body2 = binaryArbitrary.sample(); + byte[] body3 = binaryArbitrary.sample(); + + publish(messageCount, msg -> msg.subject(subject).body(body1)); + publish(messageCount, msg -> msg.property(appKey, appValue).body(body2)); + publish(messageCount, msg -> msg.subject(subject).property(appKey, appValue).body(body3)); + + List msgs = consume(messageCount * 2, options -> options.subject(subject)); + msgs.subList(0, messageCount).forEach(m -> assertThat(m).hasSubject(subject).hasBody(body1)); + msgs.subList(messageCount, messageCount * 2) + .forEach(m -> assertThat(m).hasSubject(subject).hasBody(body3)); + + msgs = consume(messageCount * 2, options -> options.property(appKey, appValue)); + msgs.subList(0, messageCount) + .forEach(m -> assertThat(m).hasProperty(appKey, appValue).hasBody(body2)); + msgs.subList(messageCount, messageCount * 2) + .forEach(m -> assertThat(m).hasProperty(appKey, appValue).hasBody(body3)); + + msgs = consume(messageCount, options -> options.subject(subject).property(appKey, appValue)); + msgs.subList(0, messageCount) + .forEach( + m -> assertThat(m).hasSubject(subject).hasProperty(appKey, appValue).hasBody(body3)); + } + + @Test + void filterExpressionFilterFewMessagesFromManyToTestFlowControl() { + String groupId = stringArbitrary.sample(); + publish(1, m -> m.groupId(groupId)); + publish(1000); + publish(1, m -> m.groupId(groupId)); + + List msgs = consume(2, m -> m.groupId(groupId)); + msgs.forEach(m -> assertThat(m).hasGroupId(groupId)); + } + + @Test + void filterExpressionStringModifier() { + publish(1, m -> m.subject("abc 123")); + publish(1, m -> m.subject("foo bar")); + publish(1, m -> m.subject("ab 12")); + + List msgs = consume(2, m -> m.subject("$p:ab")); + msgs.forEach(m -> assertThat(m.subject()).startsWith("ab")); - msgs = consume(messageCount, options -> options.subject("foo bar")); + msgs = consume(1, m -> m.subject("$s:bar")); msgs.forEach(m -> assertThat(m).hasSubject("foo bar")); } @@ -380,10 +479,10 @@ void publish(int messageCount, UnaryOperator messageLogic) { } } - Collection consume( + List consume( int expectedMessageCount, java.util.function.Consumer filterOptions) { - Set messages = ConcurrentHashMap.newKeySet(expectedMessageCount); + Queue messages = new LinkedBlockingQueue<>(); Sync consumedSync = sync(expectedMessageCount); AtomicInteger receivedMessageCount = new AtomicInteger(); ConsumerBuilder builder = @@ -406,6 +505,6 @@ Collection consume( assertThat(receivedMessageCount).hasValue(expectedMessageCount); } - return messages; + return List.copyOf(messages); } } From 16fd3ee0704760868c9c8ecbff61ba67abec6a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 8 Oct 2024 10:56:41 +0200 Subject: [PATCH 3/4] Check filter names in attach response --- .github/workflows/test-pr.yml | 2 +- .../client/amqp/impl/AmqpConnection.java | 7 ++++++ .../client/amqp/impl/AmqpConsumer.java | 22 ++++++++++++++++--- .../client/amqp/impl/AmqpConsumerBuilder.java | 8 +++++-- .../com/rabbitmq/client/amqp/impl/Utils.java | 8 +++++++ .../client/amqp/impl/SourceFiltersTest.java | 7 ++++++ .../client/amqp/impl/TestConditions.java | 3 ++- 7 files changed, 50 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 08e4c06c1..4c1025804 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -25,7 +25,7 @@ jobs: - name: Start broker run: ci/start-broker.sh env: - RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-filtex' + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:main' - name: Start toxiproxy run: ci/start-toxiproxy.sh - name: Display Java version diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index aaac7b92b..d4090af8a 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -72,6 +72,7 @@ final class AmqpConnection extends ResourceBase implements Connection { private final ConnectionSettings.AffinityStrategy affinityStrategy; private final String name; private final Lock instanceLock = new ReentrantLock(); + private final boolean filterExpressionsSupported; AmqpConnection(AmqpConnectionBuilder builder) { super(builder.listeners()); @@ -126,6 +127,8 @@ final class AmqpConnection extends ResourceBase implements Connection { ConnectionUtils.NO_RETRY_STRATEGY, this.name()); this.sync(ncw); + this.filterExpressionsSupported = + Utils.supportFilterExpressions(brokerVersion(this.nativeConnection)); LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename()); this.state(OPEN); this.environment.metricsCollector().openConnection(); @@ -672,6 +675,10 @@ ConnectionUtils.AffinityContext affinity() { return this.affinity; } + boolean filterExpressionsSupported() { + return this.filterExpressionsSupported; + } + long id() { return this.id; } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 5356ae13f..6fd587973 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -175,11 +175,27 @@ private ClientReceiver createNativeReceiver( .autoSettle(false) .creditWindow(0) .properties(properties); + Map localSourceFilters = Collections.emptyMap(); if (!filters.isEmpty()) { - receiverOptions.sourceOptions().filters(Map.copyOf(filters)); + localSourceFilters = Map.copyOf(filters); + receiverOptions.sourceOptions().filters(localSourceFilters); } - return (ClientReceiver) - ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture()); + ClientReceiver receiver = + (ClientReceiver) + ExceptionUtils.wrapGet( + nativeSession.openReceiver(address, receiverOptions).openFuture()); + if (!filters.isEmpty()) { + Map remoteSourceFilters = receiver.source().filters(); + for (Map.Entry localEntry : localSourceFilters.entrySet()) { + if (!remoteSourceFilters.containsKey(localEntry.getKey())) { + LOGGER.warn( + "Missing filter value in attach response: {} => {}", + localEntry.getKey(), + localEntry.getValue()); + } + } + } + return receiver; } catch (ClientException e) { throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address); } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java index 9875dca5e..a0b20ae1f 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java @@ -136,10 +136,10 @@ public Consumer build() { private static class DefaultStreamOptions implements StreamOptions { private final Map filters; - private final ConsumerBuilder builder; + private final AmqpConsumerBuilder builder; private final StreamFilterOptions filterOptions; - private DefaultStreamOptions(ConsumerBuilder builder, Map filters) { + private DefaultStreamOptions(AmqpConsumerBuilder builder, Map filters) { this.builder = builder; this.filters = filters; this.filterOptions = new DefaultStreamFilterOptions(this, filters); @@ -190,6 +190,10 @@ public StreamOptions filterMatchUnfiltered(boolean matchUnfiltered) { @Override public StreamFilterOptions filter() { + if (!this.builder.connection.filterExpressionsSupported()) { + throw new IllegalArgumentException( + "AMQP filter expressions requires at least RabbitMQ 4.1.0"); + } return this.filterOptions; } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index 8c3744746..b846992a0 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -224,6 +224,14 @@ static boolean is4_0_OrMore(String brokerVersion) { return versionCompare(currentVersion(brokerVersion), "4.0.0") >= 0; } + static boolean is4_1_OrMore(String brokerVersion) { + return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0; + } + + static boolean supportFilterExpressions(String brokerVersion) { + return is4_1_OrMore(brokerVersion); + } + static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo { private final String address; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java index a0cfc9019..df7936551 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java @@ -20,6 +20,7 @@ import static com.rabbitmq.client.amqp.ConsumerBuilder.StreamOffsetSpecification.*; import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; +import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0; import static com.rabbitmq.client.amqp.impl.TestUtils.sync; import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable; import static java.nio.charset.StandardCharsets.*; @@ -28,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.rabbitmq.client.amqp.*; +import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast; import com.rabbitmq.client.amqp.impl.TestUtils.Sync; import java.time.Duration; import java.time.Instant; @@ -274,6 +276,7 @@ void streamFiltering() { } @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) void filterExpressionApplicationProperties() { int messageCount = 10; UUID uuid = UUID.randomUUID(); @@ -326,6 +329,7 @@ void filterExpressionApplicationProperties() { } @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) void filterExpressionProperties() { int messageCount = 10; byte[] userId = "guest".getBytes(UTF_8); @@ -407,6 +411,7 @@ void filterExpressionProperties() { } @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) void filterExpressionsPropertiesAndApplicationProperties() { int messageCount = 10; String subject = stringArbitrary.sample(); @@ -438,6 +443,7 @@ void filterExpressionsPropertiesAndApplicationProperties() { } @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) void filterExpressionFilterFewMessagesFromManyToTestFlowControl() { String groupId = stringArbitrary.sample(); publish(1, m -> m.groupId(groupId)); @@ -449,6 +455,7 @@ void filterExpressionFilterFewMessagesFromManyToTestFlowControl() { } @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) void filterExpressionStringModifier() { publish(1, m -> m.subject("abc 123")); publish(1, m -> m.subject("foo bar")); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java index a56924cf1..017dda68d 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java @@ -34,7 +34,8 @@ public final class TestConditions { private TestConditions() {} public enum BrokerVersion { - RABBITMQ_4_0_3("4.0.3"); + RABBITMQ_4_0_3("4.0.3"), + RABBITMQ_4_1_0("4.1.0"); final String value; From 5e074610f8ed51eba8656b817c361e9d3fd79e76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 8 Oct 2024 15:45:37 +0200 Subject: [PATCH 4/4] Write Javadoc for AMQP filter expressions API --- .../rabbitmq/client/amqp/ConsumerBuilder.java | 313 +++++++++++++++++- .../amqp/docs/WebsiteDocumentation.java | 18 + 2 files changed, 330 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java index d26257267..7a4efbb87 100644 --- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java @@ -143,11 +143,15 @@ interface StreamOptions { StreamOptions offset(String interval); /** - * Filter values. + * Filter values for stream filtering. + * + *

This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be + * used together. * * @param values filter values * @return stream options * @see Stream Filtering + * @see #filter() */ StreamOptions filterValues(String... values); @@ -156,11 +160,27 @@ interface StreamOptions { * *

Default is false (messages without a filter value are not sent). * + *

This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be + * used together. + * * @param matchUnfiltered true to send messages without a filter value * @return stream options + * @see #filter() */ StreamOptions filterMatchUnfiltered(boolean matchUnfiltered); + /** + * Options for AMQP filter expressions. + * + *

Requires RabbitMQ 4.1 or more. + * + *

This a different filtering mechanism from stream filtering. Both mechanisms can be used + * together. + * + * @return the filter options + * @see #filterValues(String...) + * @see #filterMatchUnfiltered(boolean) + */ StreamFilterOptions filter(); /** @@ -181,93 +201,384 @@ enum StreamOffsetSpecification { NEXT } + /** + * Filter options for support of AMQP filter expressions. + * + *

AMQP filter expressions are supported only with streams. + * + *

This a different filtering mechanism from stream filtering. Both mechanisms can be used + * together. + * + *

Requires RabbitMQ 4.1 or more. + * + * @param type of the object returned by methods + * @see AMQP + * Filter Expressions + */ interface FilterOptions { + /** + * Filter on message ID. + * + * @param id message ID + * @return type-parameter object + */ T messageId(Object id); + /** + * Filter on message ID. + * + * @param id message ID + * @return type-parameter object + */ T messageId(String id); + /** + * Filter on message ID. + * + * @param id message ID + * @return type-parameter object + */ T messageId(long id); + /** + * Filter on message ID. + * + * @param id message ID + * @return type-parameter object + */ T messageId(byte[] id); + /** + * Filter on message ID. + * + * @param id message ID + * @return type-parameter object + */ T messageId(UUID id); + /** + * Filter on correlation ID. + * + * @param correlationId correlation ID + * @return type-parameter object + */ T correlationId(Object correlationId); + /** + * Filter on correlation ID. + * + * @param correlationId correlation ID + * @return type-parameter object + */ T correlationId(String correlationId); + /** + * Filter on correlation ID. + * + * @param correlationId correlation ID + * @return type-parameter object + */ T correlationId(long correlationId); + /** + * Filter on correlation ID. + * + * @param correlationId correlation ID + * @return type-parameter object + */ T correlationId(byte[] correlationId); + /** + * Filter on correlation ID. + * + * @param correlationId correlation ID + * @return type-parameter object + */ T correlationId(UUID correlationId); + /** + * Filter on user ID. + * + * @param userId user ID + * @return type-parameter object + */ T userId(byte[] userId); + /** + * Filter on to field. + * + * @param to to + * @return type-parameter object + */ T to(String to); + /** + * Filter on subject field. + * + * @param subject subject + * @return type-parameter object + */ T subject(String subject); + /** + * Filter on reply-to field. + * + * @param replyTo reply-to + * @return type-parameter object + */ T replyTo(String replyTo); + /** + * Filter on content-type field. + * + * @param contentType content-type + * @return type-parameter object + */ T contentType(String contentType); + /** + * Filter on content-encoding field. + * + * @param contentEncoding content-encoding + * @return type-parameter object + */ T contentEncoding(String contentEncoding); + /** + * Filter on absolute expiry time field. + * + * @param absoluteExpiryTime absolute expiry time + * @return type-parameter object + */ T absoluteExpiryTime(long absoluteExpiryTime); + /** + * Filter on creation time field. + * + * @param creationTime creation time + * @return type-parameter object + */ T creationTime(long creationTime); + /** + * Filter on group ID. + * + * @param groupId group ID + * @return type-parameter object + */ T groupId(String groupId); + /** + * Filter on group sequence. + * + * @param groupSequence group sequence + * @return type-parameter object + */ T groupSequence(int groupSequence); + /** + * Filter on reply-to group. + * + * @param groupId group ID + * @return type-parameter object + */ T replyToGroupId(String groupId); + /** + * Filter on boolean application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, boolean value); + /** + * Filter on byte application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, byte value); + /** + * Filter on short application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, short value); + /** + * Filter on integer application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, int value); + /** + * Filter on long application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, long value); + /** + * Filter on unsigned byte application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyUnsigned(String key, byte value); + /** + * Filter on unsigned short application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyUnsigned(String key, short value); + /** + * Filter on unsigned integer application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyUnsigned(String key, int value); + /** + * Filter on unsigned long application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyUnsigned(String key, long value); + /** + * Filter on float application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, float value); + /** + * Filter on double application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, double value); + /** + * Filter on 32-bit decimal number application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyDecimal32(String key, BigDecimal value); + /** + * Filter on 64-bit decimal number application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyDecimal64(String key, BigDecimal value); + /** + * Filter on 128-bit decimal number application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyDecimal128(String key, BigDecimal value); + /** + * Filter on character application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, char value); + /** + * Filter on timestamp application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertyTimestamp(String key, long value); + /** + * Filter on UUID application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, UUID value); + /** + * Filter on byte array application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, byte[] value); + /** + * Filter on string application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T property(String key, String value); + /** + * Filter on symbol application property. + * + * @param key application property key + * @param value application property value + * @return type-parameter object + */ T propertySymbol(String key, String value); } + /** + * Filter options for support of AMQP filter expressions. + * + *

Specialized {@link FilterOptions} in the context of the configuration of a stream consumer. + */ interface StreamFilterOptions extends FilterOptions { + /** + * Go back to the stream options. + * + * @return the stream options + */ StreamOptions stream(); } diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java b/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java index b3b10a4db..280f5efca 100644 --- a/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java +++ b/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java @@ -146,6 +146,24 @@ void consumingStreamFiltering() { } + void consumingAmqpFilterExpressions() { + Connection connection = null; + + Consumer consumer = connection.consumerBuilder() + .queue("some-stream") + .stream() + .filter() + .subject("$p:foo") // subject starts with 'foo' + .property("k1", "v1") // 'k1' application property equals to 'v1' + .stream() + .builder() + .messageHandler((context, message) -> { + // message processing + }) + .build(); + + } + void management() { Connection connection = null;