eu.rekawek.toxiproxy
toxiproxy-java
diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
index 64ab859b7..7a4efbb87 100644
--- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
@@ -17,7 +17,10 @@
// info@rabbitmq.com.
package com.rabbitmq.client.amqp;
+import java.math.BigDecimal;
import java.time.Instant;
+import java.util.UUID;
+import org.apache.qpid.protonj2.types.*;
/** API to configure and create a {@link Consumer}. */
public interface ConsumerBuilder {
@@ -140,11 +143,15 @@ interface StreamOptions {
StreamOptions offset(String interval);
/**
- * Filter values.
+ * Filter values for stream filtering.
+ *
+ * This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be
+ * used together.
*
* @param values filter values
* @return stream options
* @see Stream Filtering
+ * @see #filter()
*/
StreamOptions filterValues(String... values);
@@ -153,11 +160,29 @@ interface StreamOptions {
*
*
Default is false (messages without a filter value are not sent).
*
+ *
This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be
+ * used together.
+ *
* @param matchUnfiltered true to send messages without a filter value
* @return stream options
+ * @see #filter()
*/
StreamOptions filterMatchUnfiltered(boolean matchUnfiltered);
+ /**
+ * Options for AMQP filter expressions.
+ *
+ *
Requires RabbitMQ 4.1 or more.
+ *
+ *
This a different filtering mechanism from stream filtering. Both mechanisms can be used
+ * together.
+ *
+ * @return the filter options
+ * @see #filterValues(String...)
+ * @see #filterMatchUnfiltered(boolean)
+ */
+ StreamFilterOptions filter();
+
/**
* Return the consumer builder.
*
@@ -176,6 +201,387 @@ enum StreamOffsetSpecification {
NEXT
}
+ /**
+ * Filter options for support of AMQP filter expressions.
+ *
+ *
AMQP filter expressions are supported only with streams.
+ *
+ *
This a different filtering mechanism from stream filtering. Both mechanisms can be used
+ * together.
+ *
+ *
Requires RabbitMQ 4.1 or more.
+ *
+ * @param type of the object returned by methods
+ * @see AMQP
+ * Filter Expressions
+ */
+ interface FilterOptions {
+
+ /**
+ * Filter on message ID.
+ *
+ * @param id message ID
+ * @return type-parameter object
+ */
+ T messageId(Object id);
+
+ /**
+ * Filter on message ID.
+ *
+ * @param id message ID
+ * @return type-parameter object
+ */
+ T messageId(String id);
+
+ /**
+ * Filter on message ID.
+ *
+ * @param id message ID
+ * @return type-parameter object
+ */
+ T messageId(long id);
+
+ /**
+ * Filter on message ID.
+ *
+ * @param id message ID
+ * @return type-parameter object
+ */
+ T messageId(byte[] id);
+
+ /**
+ * Filter on message ID.
+ *
+ * @param id message ID
+ * @return type-parameter object
+ */
+ T messageId(UUID id);
+
+ /**
+ * Filter on correlation ID.
+ *
+ * @param correlationId correlation ID
+ * @return type-parameter object
+ */
+ T correlationId(Object correlationId);
+
+ /**
+ * Filter on correlation ID.
+ *
+ * @param correlationId correlation ID
+ * @return type-parameter object
+ */
+ T correlationId(String correlationId);
+
+ /**
+ * Filter on correlation ID.
+ *
+ * @param correlationId correlation ID
+ * @return type-parameter object
+ */
+ T correlationId(long correlationId);
+
+ /**
+ * Filter on correlation ID.
+ *
+ * @param correlationId correlation ID
+ * @return type-parameter object
+ */
+ T correlationId(byte[] correlationId);
+
+ /**
+ * Filter on correlation ID.
+ *
+ * @param correlationId correlation ID
+ * @return type-parameter object
+ */
+ T correlationId(UUID correlationId);
+
+ /**
+ * Filter on user ID.
+ *
+ * @param userId user ID
+ * @return type-parameter object
+ */
+ T userId(byte[] userId);
+
+ /**
+ * Filter on to field.
+ *
+ * @param to to
+ * @return type-parameter object
+ */
+ T to(String to);
+
+ /**
+ * Filter on subject field.
+ *
+ * @param subject subject
+ * @return type-parameter object
+ */
+ T subject(String subject);
+
+ /**
+ * Filter on reply-to field.
+ *
+ * @param replyTo reply-to
+ * @return type-parameter object
+ */
+ T replyTo(String replyTo);
+
+ /**
+ * Filter on content-type field.
+ *
+ * @param contentType content-type
+ * @return type-parameter object
+ */
+ T contentType(String contentType);
+
+ /**
+ * Filter on content-encoding field.
+ *
+ * @param contentEncoding content-encoding
+ * @return type-parameter object
+ */
+ T contentEncoding(String contentEncoding);
+
+ /**
+ * Filter on absolute expiry time field.
+ *
+ * @param absoluteExpiryTime absolute expiry time
+ * @return type-parameter object
+ */
+ T absoluteExpiryTime(long absoluteExpiryTime);
+
+ /**
+ * Filter on creation time field.
+ *
+ * @param creationTime creation time
+ * @return type-parameter object
+ */
+ T creationTime(long creationTime);
+
+ /**
+ * Filter on group ID.
+ *
+ * @param groupId group ID
+ * @return type-parameter object
+ */
+ T groupId(String groupId);
+
+ /**
+ * Filter on group sequence.
+ *
+ * @param groupSequence group sequence
+ * @return type-parameter object
+ */
+ T groupSequence(int groupSequence);
+
+ /**
+ * Filter on reply-to group.
+ *
+ * @param groupId group ID
+ * @return type-parameter object
+ */
+ T replyToGroupId(String groupId);
+
+ /**
+ * Filter on boolean application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, boolean value);
+
+ /**
+ * Filter on byte application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, byte value);
+
+ /**
+ * Filter on short application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, short value);
+
+ /**
+ * Filter on integer application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, int value);
+
+ /**
+ * Filter on long application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, long value);
+
+ /**
+ * Filter on unsigned byte application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyUnsigned(String key, byte value);
+
+ /**
+ * Filter on unsigned short application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyUnsigned(String key, short value);
+
+ /**
+ * Filter on unsigned integer application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyUnsigned(String key, int value);
+
+ /**
+ * Filter on unsigned long application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyUnsigned(String key, long value);
+
+ /**
+ * Filter on float application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, float value);
+
+ /**
+ * Filter on double application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, double value);
+
+ /**
+ * Filter on 32-bit decimal number application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyDecimal32(String key, BigDecimal value);
+
+ /**
+ * Filter on 64-bit decimal number application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyDecimal64(String key, BigDecimal value);
+
+ /**
+ * Filter on 128-bit decimal number application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyDecimal128(String key, BigDecimal value);
+
+ /**
+ * Filter on character application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, char value);
+
+ /**
+ * Filter on timestamp application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertyTimestamp(String key, long value);
+
+ /**
+ * Filter on UUID application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, UUID value);
+
+ /**
+ * Filter on byte array application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, byte[] value);
+
+ /**
+ * Filter on string application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T property(String key, String value);
+
+ /**
+ * Filter on symbol application property.
+ *
+ * @param key application property key
+ * @param value application property value
+ * @return type-parameter object
+ */
+ T propertySymbol(String key, String value);
+ }
+
+ /**
+ * Filter options for support of AMQP filter expressions.
+ *
+ * Specialized {@link FilterOptions} in the context of the configuration of a stream consumer.
+ */
+ interface StreamFilterOptions extends FilterOptions {
+
+ /**
+ * Go back to the stream options.
+ *
+ * @return the stream options
+ */
+ StreamOptions stream();
+ }
+
/**
* Callback to modify a consumer subscription before the link creation.
*
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
index aaac7b92b..d4090af8a 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
@@ -72,6 +72,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final ConnectionSettings.AffinityStrategy affinityStrategy;
private final String name;
private final Lock instanceLock = new ReentrantLock();
+ private final boolean filterExpressionsSupported;
AmqpConnection(AmqpConnectionBuilder builder) {
super(builder.listeners());
@@ -126,6 +127,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
ConnectionUtils.NO_RETRY_STRATEGY,
this.name());
this.sync(ncw);
+ this.filterExpressionsSupported =
+ Utils.supportFilterExpressions(brokerVersion(this.nativeConnection));
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
this.state(OPEN);
this.environment.metricsCollector().openConnection();
@@ -672,6 +675,10 @@ ConnectionUtils.AffinityContext affinity() {
return this.affinity;
}
+ boolean filterExpressionsSupported() {
+ return this.filterExpressionsSupported;
+ }
+
long id() {
return this.id;
}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
index 2e6a5935d..6fd587973 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
@@ -41,6 +41,7 @@
import org.apache.qpid.protonj2.engine.impl.ProtonLinkCreditState;
import org.apache.qpid.protonj2.engine.impl.ProtonReceiver;
import org.apache.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow;
+import org.apache.qpid.protonj2.types.DescribedType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
private final Long id;
private final String address;
private final String queue;
- private final Map filters;
+ private final Map filters;
private final Map linkProperties;
private final ConsumerBuilder.SubscriptionListener subscriptionListener;
private final AmqpConnection connection;
@@ -161,7 +162,7 @@ private ClientReceiver createNativeReceiver(
Session nativeSession,
String address,
Map properties,
- Map filters,
+ Map filters,
SubscriptionListener subscriptionListener) {
try {
filters = new LinkedHashMap<>(filters);
@@ -174,11 +175,27 @@ private ClientReceiver createNativeReceiver(
.autoSettle(false)
.creditWindow(0)
.properties(properties);
+ Map localSourceFilters = Collections.emptyMap();
if (!filters.isEmpty()) {
- receiverOptions.sourceOptions().filters(filters);
+ localSourceFilters = Map.copyOf(filters);
+ receiverOptions.sourceOptions().filters(localSourceFilters);
}
- return (ClientReceiver)
- ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture());
+ ClientReceiver receiver =
+ (ClientReceiver)
+ ExceptionUtils.wrapGet(
+ nativeSession.openReceiver(address, receiverOptions).openFuture());
+ if (!filters.isEmpty()) {
+ Map remoteSourceFilters = receiver.source().filters();
+ for (Map.Entry localEntry : localSourceFilters.entrySet()) {
+ if (!remoteSourceFilters.containsKey(localEntry.getKey())) {
+ LOGGER.warn(
+ "Missing filter value in attach response: {} => {}",
+ localEntry.getKey(),
+ localEntry.getValue());
+ }
+ }
+ }
+ return receiver;
} catch (ClientException e) {
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);
}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java
index 789fa3a96..a0b20ae1f 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java
@@ -22,8 +22,10 @@
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Resource;
+import java.math.BigDecimal;
import java.time.Instant;
import java.util.*;
+import org.apache.qpid.protonj2.types.*;
class AmqpConsumerBuilder implements ConsumerBuilder {
@@ -34,7 +36,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
private Consumer.MessageHandler messageHandler;
private int initialCredits = 100;
private final List listeners = new ArrayList<>();
- private final Map filters = new LinkedHashMap<>();
+ private final Map filters = new LinkedHashMap<>();
private final Map properties = new LinkedHashMap<>();
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
private SubscriptionListener subscriptionListener = NO_OP_SUBSCRIPTION_LISTENER;
@@ -116,7 +118,7 @@ List listeners() {
return listeners;
}
- Map filters() {
+ Map filters() {
return this.filters;
}
@@ -133,32 +135,31 @@ public Consumer build() {
private static class DefaultStreamOptions implements StreamOptions {
- private final Map filters;
- private final ConsumerBuilder builder;
+ private final Map filters;
+ private final AmqpConsumerBuilder builder;
+ private final StreamFilterOptions filterOptions;
- private DefaultStreamOptions(ConsumerBuilder builder, Map filters) {
+ private DefaultStreamOptions(AmqpConsumerBuilder builder, Map filters) {
this.builder = builder;
this.filters = filters;
+ this.filterOptions = new DefaultStreamFilterOptions(this, filters);
}
@Override
public StreamOptions offset(long offset) {
- this.filters.put("rabbitmq:stream-offset-spec", offset);
- return this;
+ return this.filter("rabbitmq:stream-offset-spec", offset);
}
@Override
public StreamOptions offset(Instant timestamp) {
notNull(timestamp, "Timestamp offset cannot be null");
- this.offsetSpecification(Date.from(timestamp));
- return this;
+ return this.offsetSpecification(Date.from(timestamp));
}
@Override
public StreamOptions offset(StreamOffsetSpecification specification) {
notNull(specification, "Offset specification cannot be null");
- this.offsetSpecification(specification.name().toLowerCase(Locale.ENGLISH));
- return this;
+ return this.offsetSpecification(specification.name().toLowerCase(Locale.ENGLISH));
}
@Override
@@ -171,8 +172,7 @@ public StreamOptions offset(String interval) {
+ ". "
+ "Valid examples are: 1Y, 7D, 10m. See https://www.rabbitmq.com/docs/streams#retention.");
}
- this.offsetSpecification(interval);
- return this;
+ return this.offsetSpecification(interval);
}
@Override
@@ -180,14 +180,21 @@ public StreamOptions filterValues(String... values) {
if (values == null || values.length == 0) {
throw new IllegalArgumentException("At least one stream filter value must specified");
}
- this.filters.put("rabbitmq:stream-filter", Arrays.asList(values));
- return this;
+ return this.filter("rabbitmq:stream-filter", Arrays.asList(values));
}
@Override
public StreamOptions filterMatchUnfiltered(boolean matchUnfiltered) {
- this.filters.put("rabbitmq:stream-match-unfiltered", matchUnfiltered);
- return this;
+ return this.filter("rabbitmq:stream-match-unfiltered", matchUnfiltered);
+ }
+
+ @Override
+ public StreamFilterOptions filter() {
+ if (!this.builder.connection.filterExpressionsSupported()) {
+ throw new IllegalArgumentException(
+ "AMQP filter expressions requires at least RabbitMQ 4.1.0");
+ }
+ return this.filterOptions;
}
@Override
@@ -195,12 +202,268 @@ public ConsumerBuilder builder() {
return this.builder;
}
- private void offsetSpecification(Object value) {
- this.filters.put("rabbitmq:stream-offset-spec", value);
+ private StreamOptions offsetSpecification(Object value) {
+ return filter("rabbitmq:stream-offset-spec", value);
}
+
+ private StreamOptions filter(String key, Object value) {
+ AmqpConsumerBuilder.filter(this.filters, key, value);
+ return this;
+ }
+ }
+
+ private static void filter(Map filters, String filterName, Object value) {
+ filters.put(filterName, filterValue(filterName, value));
+ }
+
+ private static DescribedType filterValue(String filterName, Object value) {
+ return new UnknownDescribedType(Symbol.getSymbol(filterName), value);
}
- static StreamOptions streamOptions(Map filters) {
+ static StreamOptions streamOptions(Map filters) {
return new DefaultStreamOptions(null, filters);
}
+
+ private static class DefaultStreamFilterOptions implements StreamFilterOptions {
+
+ private final StreamOptions streamOptions;
+ private final Map filters;
+
+ private DefaultStreamFilterOptions(
+ StreamOptions streamOptions, Map filters) {
+ this.streamOptions = streamOptions;
+ this.filters = filters;
+ }
+
+ @Override
+ public StreamFilterOptions messageId(Object id) {
+ return propertyFilter("message-id", id);
+ }
+
+ @Override
+ public StreamFilterOptions messageId(String id) {
+ return propertyFilter("message-id", id);
+ }
+
+ @Override
+ public StreamFilterOptions messageId(long id) {
+ return propertyFilter("message-id", new UnsignedLong(id));
+ }
+
+ @Override
+ public StreamFilterOptions messageId(byte[] id) {
+ return propertyFilter("message-id", new Binary(id));
+ }
+
+ @Override
+ public StreamFilterOptions messageId(UUID id) {
+ return propertyFilter("message-id", id);
+ }
+
+ @Override
+ public StreamFilterOptions correlationId(Object correlationId) {
+ return propertyFilter("correlation-id", correlationId);
+ }
+
+ @Override
+ public StreamFilterOptions correlationId(String correlationId) {
+ return propertyFilter("correlation-id", correlationId);
+ }
+
+ @Override
+ public StreamFilterOptions correlationId(long correlationId) {
+ return propertyFilter("correlation-id", new UnsignedLong(correlationId));
+ }
+
+ @Override
+ public StreamFilterOptions correlationId(byte[] correlationId) {
+ return propertyFilter("correlation-id", new Binary(correlationId));
+ }
+
+ @Override
+ public StreamFilterOptions correlationId(UUID correlationId) {
+ return propertyFilter("correlation-id", correlationId);
+ }
+
+ @Override
+ public StreamFilterOptions userId(byte[] userId) {
+ return propertyFilter("user-id", new Binary(userId));
+ }
+
+ @Override
+ public StreamFilterOptions to(String to) {
+ return propertyFilter("to", to);
+ }
+
+ @Override
+ public StreamFilterOptions subject(String subject) {
+ return propertyFilter("subject", subject);
+ }
+
+ @Override
+ public StreamFilterOptions replyTo(String replyTo) {
+ return propertyFilter("reply-to", replyTo);
+ }
+
+ @Override
+ public StreamFilterOptions contentType(String contentType) {
+ return propertyFilter("content-type", Symbol.valueOf(contentType));
+ }
+
+ @Override
+ public StreamFilterOptions contentEncoding(String contentEncoding) {
+ return propertyFilter("content-encoding", Symbol.valueOf(contentEncoding));
+ }
+
+ @Override
+ public StreamFilterOptions absoluteExpiryTime(long absoluteExpiryTime) {
+ return propertyFilter("absolute-expiry-time", new Date(absoluteExpiryTime));
+ }
+
+ @Override
+ public StreamFilterOptions creationTime(long creationTime) {
+ return propertyFilter("creation-time", new Date(creationTime));
+ }
+
+ @Override
+ public StreamFilterOptions groupId(String groupId) {
+ return propertyFilter("group-id", groupId);
+ }
+
+ @Override
+ public StreamFilterOptions groupSequence(int groupSequence) {
+ return propertyFilter("group-sequence", UnsignedInteger.valueOf(groupSequence));
+ }
+
+ @Override
+ public StreamFilterOptions replyToGroupId(String groupId) {
+ return propertyFilter("reply-to-group-id", groupId);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, boolean value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, byte value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, short value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, int value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, long value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions propertyUnsigned(String key, byte value) {
+ return this.applicationPropertyFilter(key, UnsignedByte.valueOf(value));
+ }
+
+ @Override
+ public StreamFilterOptions propertyUnsigned(String key, short value) {
+ return this.applicationPropertyFilter(key, UnsignedShort.valueOf(value));
+ }
+
+ @Override
+ public StreamFilterOptions propertyUnsigned(String key, int value) {
+ return this.applicationPropertyFilter(key, UnsignedInteger.valueOf(value));
+ }
+
+ @Override
+ public StreamFilterOptions propertyUnsigned(String key, long value) {
+ return this.applicationPropertyFilter(key, UnsignedLong.valueOf(value));
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, float value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, double value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions propertyDecimal32(String key, BigDecimal value) {
+ return this.applicationPropertyFilter(key, new Decimal32(value));
+ }
+
+ @Override
+ public StreamFilterOptions propertyDecimal64(String key, BigDecimal value) {
+ return this.applicationPropertyFilter(key, new Decimal64(value));
+ }
+
+ @Override
+ public StreamFilterOptions propertyDecimal128(String key, BigDecimal value) {
+ return this.applicationPropertyFilter(key, new Decimal128(value));
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, char value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions propertyTimestamp(String key, long value) {
+ return this.applicationPropertyFilter(key, new Date(value));
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, UUID value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, byte[] value) {
+ return this.applicationPropertyFilter(key, new Binary(value));
+ }
+
+ @Override
+ public StreamFilterOptions property(String key, String value) {
+ return this.applicationPropertyFilter(key, value);
+ }
+
+ @Override
+ public StreamFilterOptions propertySymbol(String key, String value) {
+ return this.applicationPropertyFilter(key, Symbol.valueOf(value));
+ }
+
+ @Override
+ public StreamOptions stream() {
+ return this.streamOptions;
+ }
+
+ private StreamFilterOptions propertyFilter(String propertyKey, Object propertyValue) {
+ Map filter = filter("amqp:properties-filter");
+ filter.put(Symbol.valueOf(propertyKey), propertyValue);
+ return this;
+ }
+
+ private StreamFilterOptions applicationPropertyFilter(
+ String propertyKey, Object propertyValue) {
+ Map filter = filter("amqp:application-properties-filter");
+ filter.put(propertyKey, propertyValue);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map filter(String filterName) {
+ DescribedType type =
+ this.filters.computeIfAbsent(
+ filterName, fName -> filterValue(fName, new LinkedHashMap<>()));
+ return (Map) type.getDescribed();
+ }
+ }
}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
index e0d62c996..92f37cf22 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
@@ -29,7 +29,7 @@
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.types.*;
-class AmqpMessage implements Message {
+final class AmqpMessage implements Message {
private static final byte[] EMPTY_BODY = new byte[0];
@@ -241,7 +241,7 @@ public Message groupSequence(int groupSequence) {
@Override
public Message replyToGroupId(String groupId) {
- callOnDelegate(m -> replyToGroupId(groupId));
+ callOnDelegate(m -> m.replyToGroupId(groupId));
return this;
}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java
index 8c3744746..b846992a0 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java
@@ -224,6 +224,14 @@ static boolean is4_0_OrMore(String brokerVersion) {
return versionCompare(currentVersion(brokerVersion), "4.0.0") >= 0;
}
+ static boolean is4_1_OrMore(String brokerVersion) {
+ return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0;
+ }
+
+ static boolean supportFilterExpressions(String brokerVersion) {
+ return is4_1_OrMore(brokerVersion);
+ }
+
static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo {
private final String address;
diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java b/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java
index b3b10a4db..280f5efca 100644
--- a/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java
+++ b/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java
@@ -146,6 +146,24 @@ void consumingStreamFiltering() {
}
+ void consumingAmqpFilterExpressions() {
+ Connection connection = null;
+
+ Consumer consumer = connection.consumerBuilder()
+ .queue("some-stream")
+ .stream()
+ .filter()
+ .subject("$p:foo") // subject starts with 'foo'
+ .property("k1", "v1") // 'k1' application property equals to 'v1'
+ .stream()
+ .builder()
+ .messageHandler((context, message) -> {
+ // message processing
+ })
+ .build();
+
+ }
+
void management() {
Connection connection = null;
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java
new file mode 100644
index 000000000..af9019515
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructure.java
@@ -0,0 +1,27 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.client.amqp.impl;
+
+import java.lang.annotation.*;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@ExtendWith(AmqpTestInfrastructureExtension.class)
+public @interface AmqpTestInfrastructure {}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java
index ad57a0601..72d31e824 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java
@@ -22,6 +22,7 @@
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Environment;
+import java.lang.annotation.*;
import java.lang.reflect.Field;
import org.junit.jupiter.api.extension.*;
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
index 406a52f04..f5ff2c463 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
@@ -23,10 +23,12 @@
import com.rabbitmq.client.amqp.Message;
import java.time.Duration;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import org.apache.qpid.protonj2.types.UnsignedLong;
import org.assertj.core.api.AbstractObjectAssert;
final class Assertions {
@@ -226,10 +228,99 @@ private MessageAssert(Message message) {
super(message, MessageAssert.class);
}
+ MessageAssert hasId(long id) {
+ return this.hasId(new UnsignedLong(id));
+ }
+
MessageAssert hasId(Object id) {
+ return hasField("id", actual.messageId(), id);
+ }
+
+ MessageAssert hasCorrelationId(long correlationId) {
+ return this.hasCorrelationId(new UnsignedLong(correlationId));
+ }
+
+ MessageAssert hasCorrelationId(Object id) {
+ return hasField("correlation-id", actual.correlationId(), id);
+ }
+
+ MessageAssert hasUserId(byte[] userId) {
+ isNotNull();
+ org.assertj.core.api.Assertions.assertThat(actual.userId()).isEqualTo(userId);
+ return this;
+ }
+
+ MessageAssert hasTo(String to) {
+ return hasField("to", actual.to(), to);
+ }
+
+ MessageAssert hasSubject(String subject) {
+ return hasField("subject", actual.subject(), subject);
+ }
+
+ MessageAssert hasReplyTo(String replyTo) {
+ return hasField("reply-to", actual.replyTo(), replyTo);
+ }
+
+ MessageAssert hasContentType(String contentType) {
+ return hasField("content-type", actual.contentType(), contentType);
+ }
+
+ MessageAssert hasContentEncoding(String contentEncoding) {
+ return hasField("content-encoding", actual.contentEncoding(), contentEncoding);
+ }
+
+ MessageAssert hasAbsoluteExpiryTime(long absoluteExpiryTime) {
+ isNotNull();
+ org.assertj.core.api.Assertions.assertThat(actual.absoluteExpiryTime())
+ .isEqualTo(absoluteExpiryTime);
+ return this;
+ }
+
+ MessageAssert hasCreationTime(long creationTime) {
+ isNotNull();
+ org.assertj.core.api.Assertions.assertThat(actual.creationTime()).isEqualTo(creationTime);
+ return this;
+ }
+
+ MessageAssert hasGroupId(String groupId) {
+ return hasField("group-id", actual.groupId(), groupId);
+ }
+
+ MessageAssert hasGroupSequence(long groupSequence) {
+ isNotNull();
+ org.assertj.core.api.Assertions.assertThat(actual.groupSequence()).isEqualTo(groupSequence);
+ return this;
+ }
+
+ MessageAssert hasReplyToGroupId(String groupId) {
+ return hasField("reply-to-group-id", actual.replyToGroupId(), groupId);
+ }
+
+ MessageAssert hasBody(byte[] body) {
+ isNotNull();
+ org.assertj.core.api.Assertions.assertThat(actual.body()).isEqualTo(body);
+ return this;
+ }
+
+ MessageAssert hasProperty(String key) {
isNotNull();
- if (!actual.messageId().equals(id)) {
- fail("Message ID should be '%s' but is '%s'", id, actual.messageId());
+ if (!actual.hasProperty(key)) {
+ fail("Message should have property '%s' but does not", key);
+ }
+ return this;
+ }
+
+ MessageAssert hasProperty(String key, Object value) {
+ if (key == null || value == null) {
+ throw new IllegalArgumentException();
+ }
+ isNotNull();
+ hasProperty(key);
+ if (!value.equals(this.actual.property(key))) {
+ fail(
+ "Message should have property '%s = %s' but has '%s = %s'",
+ key, value, key, this.actual.property(key));
}
return this;
}
@@ -263,6 +354,14 @@ MessageAssert doesNotHaveAnnotation(String key) {
}
return this;
}
+
+ private MessageAssert hasField(String fieldLabel, Object value, Object expected) {
+ isNotNull();
+ if (!Objects.equals(value, expected)) {
+ fail("Field '%s' should be '%s' but is '%s'", fieldLabel, expected, value);
+ }
+ return this;
+ }
}
static class ConnectionAssert extends AbstractObjectAssert {
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java
index fd041b268..df7936551 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java
@@ -18,217 +18,209 @@
package com.rabbitmq.client.amqp.impl;
import static com.rabbitmq.client.amqp.ConsumerBuilder.StreamOffsetSpecification.*;
+import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
+import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
+import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
+import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable;
+import static java.nio.charset.StandardCharsets.*;
import static java.util.stream.IntStream.range;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.rabbitmq.client.amqp.*;
+import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast;
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
import java.time.Duration;
import java.time.Instant;
-import java.util.List;
-import java.util.SortedSet;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.extension.ExtendWith;
+import java.util.function.UnaryOperator;
+import net.jqwik.api.Arbitraries;
+import net.jqwik.api.arbitraries.ArrayArbitrary;
+import net.jqwik.api.arbitraries.IntegerArbitrary;
+import net.jqwik.api.arbitraries.StringArbitrary;
+import org.apache.qpid.protonj2.types.Binary;
+import org.apache.qpid.protonj2.types.Symbol;
+import org.junit.jupiter.api.*;
-@ExtendWith(AmqpTestInfrastructureExtension.class)
+@AmqpTestInfrastructure
public class SourceFiltersTest {
Connection connection;
String name;
+ ArrayArbitrary binaryArbitrary;
+ StringArbitrary stringArbitrary;
+ IntegerArbitrary integerArbitrary;
@BeforeEach
void init(TestInfo info) {
this.name = TestUtils.name(info);
+ connection.management().queue(this.name).type(STREAM).declare();
+ binaryArbitrary = Arbitraries.bytes().array(byte[].class).ofMinSize(10).ofMaxSize(20);
+ stringArbitrary = Arbitraries.strings().ofMinLength(10).ofMaxLength(20);
+ integerArbitrary = Arbitraries.integers();
+ }
+
+ @AfterEach
+ void tearDown() {
+ connection.management().queueDeletion().delete(this.name);
}
@Test
void streamConsumerOptionsOffsetLong() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageCount = 100;
- publish(messageCount);
-
- SortedSet offsets = new ConcurrentSkipListSet<>();
- Sync consumeSync = sync(messageCount);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(0L)
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- offsets.add((Long) msg.annotation("x-stream-offset"));
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- SortedSet offsetTail = offsets.tailSet(offsets.last() / 2);
- Assertions.assertThat(offsetTail.first()).isPositive();
- consumer.close();
- consumeSync.reset(offsetTail.size());
- consumer =
- connection.consumerBuilder().stream()
- .offset(offsetTail.first())
- .builder()
- .queue(name)
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageCount = 100;
+ publish(messageCount);
+
+ SortedSet offsets = new ConcurrentSkipListSet<>();
+ Sync consumeSync = sync(messageCount);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(0L)
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ offsets.add((Long) msg.annotation("x-stream-offset"));
+ consumeSync.down();
+ })
+ .build();
+ assertThat(consumeSync).completes();
+ SortedSet offsetTail = offsets.tailSet(offsets.last() / 2);
+ assertThat(offsetTail.first()).isPositive();
+ consumer.close();
+ consumeSync.reset(offsetTail.size());
+ consumer =
+ connection.consumerBuilder().stream()
+ .offset(offsetTail.first())
+ .builder()
+ .queue(name)
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ consumeSync.down();
+ })
+ .build();
+ assertThat(consumeSync).completes();
+ consumer.close();
}
@Test
void streamConsumerOptionsOffsetFirst() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageCount = 100;
- publish(messageCount);
-
- Sync consumeSync = sync(messageCount);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(FIRST)
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageCount = 100;
+ publish(messageCount);
+
+ Sync consumeSync = sync(messageCount);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(FIRST)
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ consumeSync.down();
+ })
+ .build();
+ assertThat(consumeSync).completes();
+ consumer.close();
}
@Test
void streamConsumerOptionsOffsetLast() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageCount = 100;
- publish(messageCount);
-
- Sync consumeSync = sync(1);
- AtomicLong firstOffset = new AtomicLong(-1);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(LAST)
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset"));
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- Assertions.assertThat(firstOffset).hasPositiveValue();
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageCount = 100;
+ publish(messageCount);
+
+ Sync consumeSync = sync(1);
+ AtomicLong firstOffset = new AtomicLong(-1);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(LAST)
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset"));
+ consumeSync.down();
+ })
+ .build();
+ assertThat(consumeSync).completes();
+ assertThat(firstOffset).hasPositiveValue();
+ consumer.close();
}
@Test
void streamConsumerOptionsOffsetNext() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageCount = 100;
- publish(messageCount);
-
- Sync consumeSync = sync(messageCount);
- Sync openSync = sync(1);
- AtomicLong firstOffset = new AtomicLong(-1);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(NEXT)
- .builder()
- .listeners(
- context -> {
- if (context.currentState() == Resource.State.OPEN) {
- openSync.down();
- }
- })
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset"));
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(openSync).completes();
- publish(messageCount);
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- Assertions.assertThat(firstOffset).hasPositiveValue();
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageCount = 100;
+ publish(messageCount);
+
+ Sync consumeSync = sync(messageCount);
+ Sync openSync = sync(1);
+ AtomicLong firstOffset = new AtomicLong(-1);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(NEXT)
+ .builder()
+ .listeners(
+ context -> {
+ if (context.currentState() == Resource.State.OPEN) {
+ openSync.down();
+ }
+ })
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ firstOffset.compareAndSet(-1, (Long) msg.annotation("x-stream-offset"));
+ consumeSync.down();
+ })
+ .build();
+ assertThat(openSync).completes();
+ publish(messageCount);
+ assertThat(consumeSync).completes();
+ assertThat(firstOffset).hasPositiveValue();
+ consumer.close();
}
@Test
void streamConsumerOptionsOffsetTimestamp() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageCount = 100;
- publish(messageCount);
-
- Sync consumeSync = sync(messageCount);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(Instant.now().minus(Duration.ofMinutes(10)))
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageCount = 100;
+ publish(messageCount);
+
+ Sync consumeSync = sync(messageCount);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(Instant.now().minus(Duration.ofMinutes(10)))
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ consumeSync.down();
+ })
+ .build();
+ assertThat(consumeSync).completes();
+ consumer.close();
}
@Test
void streamConsumerOptionsOffsetInterval() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageCount = 100;
- publish(messageCount);
-
- Sync consumeSync = sync(messageCount);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset("10m")
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- ctx.accept();
- consumeSync.down();
- })
- .build();
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeSync).completes();
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageCount = 100;
+ publish(messageCount);
+
+ Sync consumeSync = sync(messageCount);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset("10m")
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ ctx.accept();
+ consumeSync.down();
+ })
+ .build();
+ assertThat(consumeSync).completes();
+ consumer.close();
}
@Test
@@ -239,72 +231,287 @@ void streamConsumerOptionsOffsetIntervalWithInvalidSyntaxShouldThrow() {
@Test
void streamFiltering() {
- connection.management().queue(name).type(Management.QueueType.STREAM).declare();
- try {
- int messageWaveCount = 100;
- List waves = List.of("apple", "orange", "", "banana");
- waves.forEach(v -> publish(messageWaveCount, v));
- int waveCount = waves.size();
-
- AtomicInteger receivedCount = new AtomicInteger(0);
- Consumer consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(FIRST)
- .filterValues("banana")
- .filterMatchUnfiltered(false)
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- receivedCount.incrementAndGet();
- ctx.accept();
- })
- .build();
- TestUtils.waitUntilStable(receivedCount::get);
- Assertions.assertThat(receivedCount)
- .hasValueGreaterThanOrEqualTo(messageWaveCount)
- .hasValueLessThan(waveCount * messageWaveCount);
- consumer.close();
-
- receivedCount.set(0);
- consumer =
- connection.consumerBuilder().queue(name).stream()
- .offset(FIRST)
- .filterValues("banana")
- .filterMatchUnfiltered(true)
- .builder()
- .messageHandler(
- (ctx, msg) -> {
- receivedCount.incrementAndGet();
- ctx.accept();
- })
- .build();
- TestUtils.waitUntilStable(receivedCount::get);
- Assertions.assertThat(receivedCount)
- .hasValueGreaterThanOrEqualTo(2 * messageWaveCount)
- .hasValueLessThan(waveCount * messageWaveCount);
- consumer.close();
- } finally {
- connection.management().queueDeletion().delete(name);
- }
+ int messageWaveCount = 100;
+ List waves = List.of("apple", "orange", "", "banana");
+ waves.forEach(v -> publish(messageWaveCount, v));
+ int waveCount = waves.size();
+
+ AtomicInteger receivedCount = new AtomicInteger(0);
+ Consumer consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(FIRST)
+ .filterValues("banana")
+ .filterMatchUnfiltered(false)
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ receivedCount.incrementAndGet();
+ ctx.accept();
+ })
+ .build();
+ waitUntilStable(receivedCount::get);
+ assertThat(receivedCount)
+ .hasValueGreaterThanOrEqualTo(messageWaveCount)
+ .hasValueLessThan(waveCount * messageWaveCount);
+ consumer.close();
+
+ receivedCount.set(0);
+ consumer =
+ connection.consumerBuilder().queue(name).stream()
+ .offset(FIRST)
+ .filterValues("banana")
+ .filterMatchUnfiltered(true)
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ receivedCount.incrementAndGet();
+ ctx.accept();
+ })
+ .build();
+ waitUntilStable(receivedCount::get);
+ assertThat(receivedCount)
+ .hasValueGreaterThanOrEqualTo(2 * messageWaveCount)
+ .hasValueLessThan(waveCount * messageWaveCount);
+ consumer.close();
+ }
+
+ @Test
+ @BrokerVersionAtLeast(RABBITMQ_4_1_0)
+ void filterExpressionApplicationProperties() {
+ int messageCount = 10;
+ UUID uuid = UUID.randomUUID();
+ long now = System.currentTimeMillis();
+ byte[] binary = binaryArbitrary.sample();
+ publish(messageCount, msg -> msg.property("foo", true));
+ publish(messageCount, msg -> msg.property("foo", 42));
+ publish(messageCount, msg -> msg.property("foo", 42.1));
+ publish(messageCount, msg -> msg.property("foo", now));
+ publish(messageCount, msg -> msg.property("foo", uuid));
+ publish(messageCount, msg -> msg.property("foo", binary));
+ publish(messageCount, msg -> msg.property("foo", "bar"));
+ publish(messageCount, msg -> msg.property("foo", "baz"));
+ publish(messageCount, msg -> msg.property("foo", "bar"));
+ publish(messageCount, msg -> msg.propertySymbol("foo", "symbol"));
+ publish(messageCount, msg -> msg.property("foo", "bar").property("k1", 42));
+
+ Collection msgs = consume(messageCount, options -> options.property("foo", true));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", true));
+
+ msgs = consume(messageCount, options -> options.property("foo", 42));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", 42));
+
+ msgs = consume(messageCount, options -> options.property("foo", 42.1));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", 42.1));
+
+ msgs = consume(messageCount, options -> options.propertyTimestamp("foo", now));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", now));
+
+ msgs = consume(messageCount, options -> options.property("foo", uuid));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", uuid));
+
+ msgs = consume(messageCount, options -> options.property("foo", binary));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", new Binary(binary)));
+
+ msgs = consume(messageCount, options -> options.property("foo", "baz"));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", "baz"));
+
+ msgs = consume(messageCount * 3, options -> options.property("foo", "bar"));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", "bar"));
+
+ msgs = consume(messageCount * 4, options -> options.property("foo", "$p:b"));
+ assertThat(msgs).allMatch(m -> m.property("foo").toString().startsWith("b"));
+
+ msgs = consume(messageCount, options -> options.propertySymbol("foo", "symbol"));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", Symbol.valueOf("symbol")));
+
+ msgs = consume(messageCount, options -> options.property("foo", "bar").property("k1", 42));
+ msgs.forEach(m -> assertThat(m).hasProperty("foo", "bar").hasProperty("k1", 42));
+ }
+
+ @Test
+ @BrokerVersionAtLeast(RABBITMQ_4_1_0)
+ void filterExpressionProperties() {
+ int messageCount = 10;
+ byte[] userId = "guest".getBytes(UTF_8);
+ long now = System.currentTimeMillis();
+ long later = now + Duration.ofMinutes(10).toMillis();
+ UUID uuid = UUID.randomUUID();
+ publish(messageCount, msg -> msg.messageId(42));
+ publish(messageCount, msg -> msg.messageId(42 * 2));
+ publish(messageCount, msg -> msg.messageId(uuid));
+ publish(messageCount, msg -> msg.correlationId(43));
+ publish(messageCount, msg -> msg.correlationId(43 * 2));
+ publish(messageCount, msg -> msg.correlationId(uuid));
+ publish(messageCount, msg -> msg.userId(userId));
+ publish(messageCount, msg -> msg.to("to foo bar"));
+ publish(messageCount, msg -> msg.to("to foo baz"));
+ publish(messageCount, msg -> msg.subject("subject foo bar"));
+ publish(messageCount, msg -> msg.subject("subject foo baz"));
+ publish(messageCount, msg -> msg.replyTo("reply-to foo bar"));
+ publish(messageCount, msg -> msg.replyTo("reply-to foo baz"));
+ publish(messageCount, msg -> msg.contentType("text/plain"));
+ publish(messageCount, msg -> msg.contentType("text/html"));
+ publish(messageCount, msg -> msg.contentEncoding("gzip"));
+ publish(messageCount, msg -> msg.contentEncoding("zstd"));
+ publish(messageCount, msg -> msg.absoluteExpiryTime(now));
+ publish(messageCount, msg -> msg.absoluteExpiryTime(later));
+ publish(messageCount, msg -> msg.creationTime(now));
+ publish(messageCount, msg -> msg.creationTime(later));
+ publish(messageCount, msg -> msg.groupId("group-id foo bar"));
+ publish(messageCount, msg -> msg.groupId("group-id foo baz"));
+ publish(messageCount, msg -> msg.groupSequence(44));
+ publish(messageCount, msg -> msg.groupSequence(44 * 2));
+ publish(messageCount, msg -> msg.replyToGroupId("reply-to-group-id foo bar"));
+ publish(messageCount, msg -> msg.replyToGroupId("reply-to-group-id foo baz"));
+
+ Collection msgs = consume(messageCount, options -> options.messageId(42));
+ msgs.forEach(m -> assertThat(m).hasId(42));
+
+ msgs = consume(messageCount, options -> options.messageId(uuid));
+ msgs.forEach(m -> assertThat(m).hasId(uuid));
+
+ msgs = consume(messageCount, options -> options.correlationId(43));
+ msgs.forEach(m -> assertThat(m).hasCorrelationId(43));
+
+ msgs = consume(messageCount, options -> options.correlationId(uuid));
+ msgs.forEach(m -> assertThat(m).hasCorrelationId(uuid));
+
+ msgs = consume(messageCount, options -> options.userId(userId));
+ msgs.forEach(m -> assertThat(m).hasUserId(userId));
+
+ msgs = consume(messageCount, options -> options.to("to foo bar"));
+ msgs.forEach(m -> assertThat(m).hasTo("to foo bar"));
+
+ msgs = consume(messageCount, options -> options.subject("subject foo bar"));
+ msgs.forEach(m -> assertThat(m).hasSubject("subject foo bar"));
+
+ msgs = consume(messageCount, options -> options.replyTo("reply-to foo bar"));
+ msgs.forEach(m -> assertThat(m).hasReplyTo("reply-to foo bar"));
+
+ msgs = consume(messageCount, options -> options.contentType("text/html"));
+ msgs.forEach(m -> assertThat(m).hasContentType("text/html"));
+
+ msgs = consume(messageCount, options -> options.contentEncoding("zstd"));
+ msgs.forEach(m -> assertThat(m).hasContentEncoding("zstd"));
+
+ msgs = consume(messageCount, options -> options.absoluteExpiryTime(later));
+ msgs.forEach(m -> assertThat(m).hasAbsoluteExpiryTime(later));
+
+ msgs = consume(messageCount, options -> options.creationTime(later));
+ msgs.forEach(m -> assertThat(m).hasCreationTime(later));
+
+ msgs = consume(messageCount, options -> options.groupId("group-id foo baz"));
+ msgs.forEach(m -> assertThat(m).hasGroupId("group-id foo baz"));
+
+ msgs = consume(messageCount, options -> options.groupSequence(44));
+ msgs.forEach(m -> assertThat(m).hasGroupSequence(44));
+
+ msgs = consume(messageCount, options -> options.replyToGroupId("reply-to-group-id foo baz"));
+ msgs.forEach(m -> assertThat(m).hasReplyToGroupId("reply-to-group-id foo baz"));
+ }
+
+ @Test
+ @BrokerVersionAtLeast(RABBITMQ_4_1_0)
+ void filterExpressionsPropertiesAndApplicationProperties() {
+ int messageCount = 10;
+ String subject = stringArbitrary.sample();
+ String appKey = stringArbitrary.sample();
+ int appValue = integerArbitrary.sample();
+ byte[] body1 = binaryArbitrary.sample();
+ byte[] body2 = binaryArbitrary.sample();
+ byte[] body3 = binaryArbitrary.sample();
+
+ publish(messageCount, msg -> msg.subject(subject).body(body1));
+ publish(messageCount, msg -> msg.property(appKey, appValue).body(body2));
+ publish(messageCount, msg -> msg.subject(subject).property(appKey, appValue).body(body3));
+
+ List msgs = consume(messageCount * 2, options -> options.subject(subject));
+ msgs.subList(0, messageCount).forEach(m -> assertThat(m).hasSubject(subject).hasBody(body1));
+ msgs.subList(messageCount, messageCount * 2)
+ .forEach(m -> assertThat(m).hasSubject(subject).hasBody(body3));
+
+ msgs = consume(messageCount * 2, options -> options.property(appKey, appValue));
+ msgs.subList(0, messageCount)
+ .forEach(m -> assertThat(m).hasProperty(appKey, appValue).hasBody(body2));
+ msgs.subList(messageCount, messageCount * 2)
+ .forEach(m -> assertThat(m).hasProperty(appKey, appValue).hasBody(body3));
+
+ msgs = consume(messageCount, options -> options.subject(subject).property(appKey, appValue));
+ msgs.subList(0, messageCount)
+ .forEach(
+ m -> assertThat(m).hasSubject(subject).hasProperty(appKey, appValue).hasBody(body3));
+ }
+
+ @Test
+ @BrokerVersionAtLeast(RABBITMQ_4_1_0)
+ void filterExpressionFilterFewMessagesFromManyToTestFlowControl() {
+ String groupId = stringArbitrary.sample();
+ publish(1, m -> m.groupId(groupId));
+ publish(1000);
+ publish(1, m -> m.groupId(groupId));
+
+ List msgs = consume(2, m -> m.groupId(groupId));
+ msgs.forEach(m -> assertThat(m).hasGroupId(groupId));
+ }
+
+ @Test
+ @BrokerVersionAtLeast(RABBITMQ_4_1_0)
+ void filterExpressionStringModifier() {
+ publish(1, m -> m.subject("abc 123"));
+ publish(1, m -> m.subject("foo bar"));
+ publish(1, m -> m.subject("ab 12"));
+
+ List msgs = consume(2, m -> m.subject("$p:ab"));
+ msgs.forEach(m -> assertThat(m.subject()).startsWith("ab"));
+
+ msgs = consume(1, m -> m.subject("$s:bar"));
+ msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
}
void publish(int messageCount) {
- this.publish(messageCount, null);
+ this.publish(messageCount, UnaryOperator.identity());
}
void publish(int messageCount, String filterValue) {
+ publish(messageCount, msg -> msg.annotation("x-stream-filter-value", filterValue));
+ }
+
+ void publish(int messageCount, UnaryOperator messageLogic) {
try (Publisher publisher = connection.publisherBuilder().queue(name).build()) {
Sync publishSync = sync(messageCount);
+ Publisher.Callback callback = ctx -> publishSync.down();
range(0, messageCount)
- .forEach(
- ignored -> {
- Message message = publisher.message();
- if (filterValue != null && !filterValue.isBlank()) {
- message.annotation("x-stream-filter-value", filterValue);
- }
- publisher.publish(message, ctx -> publishSync.down());
- });
- com.rabbitmq.client.amqp.impl.Assertions.assertThat(publishSync).completes();
+ .forEach(ignored -> publisher.publish(messageLogic.apply(publisher.message()), callback));
+ assertThat(publishSync).completes();
}
}
+
+ List consume(
+ int expectedMessageCount,
+ java.util.function.Consumer filterOptions) {
+ Queue messages = new LinkedBlockingQueue<>();
+ Sync consumedSync = sync(expectedMessageCount);
+ AtomicInteger receivedMessageCount = new AtomicInteger();
+ ConsumerBuilder builder =
+ connection
+ .consumerBuilder()
+ .queue(this.name)
+ .messageHandler(
+ (ctx, msg) -> {
+ messages.add(msg);
+ receivedMessageCount.incrementAndGet();
+ consumedSync.down();
+ ctx.accept();
+ });
+
+ filterOptions.accept(builder.stream().offset(FIRST).filter());
+
+ try (Consumer ignored = builder.build()) {
+ assertThat(consumedSync).completes();
+ waitUntilStable(receivedMessageCount::get, Duration.ofMillis(50));
+ assertThat(receivedMessageCount).hasValue(expectedMessageCount);
+ }
+
+ return List.copyOf(messages);
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
index a56924cf1..017dda68d 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
@@ -34,7 +34,8 @@ public final class TestConditions {
private TestConditions() {}
public enum BrokerVersion {
- RABBITMQ_4_0_3("4.0.3");
+ RABBITMQ_4_0_3("4.0.3"),
+ RABBITMQ_4_1_0("4.1.0");
final String value;
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
index 7f29caf86..40552d18a 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
@@ -58,8 +58,11 @@ static void submitTask(Runnable task) {
}
static T waitUntilStable(Supplier call) {
+ return waitUntilStable(call, Duration.ofMillis(200));
+ }
+
+ static T waitUntilStable(Supplier call, Duration waitTime) {
Duration timeout = Duration.ofSeconds(10);
- Duration waitTime = Duration.ofMillis(200);
Duration waitedTime = Duration.ZERO;
T newValue = null;
while (waitedTime.compareTo(timeout) <= 0) {