Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<version>2.3.0.DATAREDIS-1119-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StringRecord;
Expand Down Expand Up @@ -3666,7 +3669,8 @@ public List<RecordId> xClaimJustId(String key, String group, String consumer, XC
*/
@Override
public List<StringRecord> xClaim(String key, String group, String consumer, XClaimOptions options) {
return convertAndReturn(delegate.xClaim(serialize(key), group, consumer, options), listByteMapRecordToStringMapRecordConverter);
return convertAndReturn(delegate.xClaim(serialize(key), group, consumer, options),
listByteMapRecordToStringMapRecordConverter);
}

/*
Expand Down Expand Up @@ -3705,6 +3709,33 @@ public Boolean xGroupDestroy(String key, String group) {
return convertAndReturn(delegate.xGroupDestroy(serialize(key), group), identityConverter);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.StringRedisConnection#xInfo(java.lang.String)
*/
@Override
public XInfoStream xInfo(String key) {
return convertAndReturn(delegate.xInfo(serialize(key)), identityConverter);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.StringRedisConnection#xInfoGroups(java.lang.String)
*/
@Override
public XInfoGroups xInfoGroups(String key) {
return convertAndReturn(delegate.xInfoGroups(serialize(key)), identityConverter);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.StringRedisConnection#xInfoConsumers(java.lang.String, java.lang.String)
*/
@Override
public XInfoConsumers xInfoConsumers(String key, String groupName) {
return convertAndReturn(delegate.xInfoConsumers(serialize(key), groupName), identityConverter);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.StringRedisConnection#xLen(java.lang.String)
Expand Down Expand Up @@ -3875,6 +3906,33 @@ public Boolean xGroupDestroy(byte[] key, String groupName) {
return delegate.xGroupDestroy(key, groupName);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisStreamCommands#xInfo(byte[])
*/
@Override
public XInfoStream xInfo(byte[] key) {
return delegate.xInfo(key);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoGroups(byte[])
*/
@Override
public XInfoGroups xInfoGroups(byte[] key) {
return delegate.xInfoGroups(key);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoConsumers(byte[], java.lang.String)
*/
@Override
public XInfoConsumers xInfoConsumers(byte[] key, String groupName) {
return delegate.xInfoConsumers(key, groupName);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisStreamCommands#xLen(byte[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.Cursor;
Expand Down Expand Up @@ -495,6 +498,27 @@ default Boolean xGroupDestroy(byte[] key, String groupName) {
return streamCommands().xGroupDestroy(key, groupName);
}

/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
@Override
@Deprecated
default XInfoStream xInfo(byte[] key) {
return streamCommands().xInfo(key);
}

/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
@Override
@Deprecated
default XInfoGroups xInfoGroups(byte[] key) {
return streamCommands().xInfoGroups(key);
}

/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
@Override
@Deprecated
default XInfoConsumers xInfoConsumers(byte[] key, String groupName) {
return streamCommands().xInfoConsumers(key, groupName);
}

/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
@Override
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
Expand Down Expand Up @@ -1000,6 +1003,101 @@ default Flux<ByteBufferRecord> xRead(StreamReadOptions readOptions, StreamOffset
*/
Flux<CommandResponse<ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReadCommand> commands);

/**
* @author Christoph Strobl
* @since 2.3
*/
class XInfoCommand extends KeyCommand {

private final @Nullable String groupName;

public XInfoCommand(@Nullable ByteBuffer key, @Nullable String groupName) {

super(key);
this.groupName = groupName;
}

public static XInfoCommand xInfo() {
return new XInfoCommand(null, null);
}

public XInfoCommand of(ByteBuffer key) {
return new XInfoCommand(key, groupName);
}

public XInfoCommand consumersIn(String groupName) {
return new XInfoCommand(getKey(), groupName);
}

public String getGroupName() {
return groupName;
}
}

/**
* Obtain general information about the stream stored at the specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @return a {@link Mono} emitting {@link XInfoStream} when ready.
* @since 2.3
*/
default Mono<XInfoStream> xInfo(ByteBuffer key) {
return xInfo(Mono.just(XInfoCommand.xInfo().of(key))).next().map(CommandResponse::getOutput);
}

/**
* Obtain general information about the stream stored at the specified {@literal key}.
*
* @param commands must not be {@literal null}.
* @return never {@literal null}.
* @since 2.3
*/
Flux<CommandResponse<XInfoCommand, XInfoStream>> xInfo(Publisher<XInfoCommand> commands);

/**
* Obtain general information about the stream stored at the specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @return a {@link Flux} emitting consumer group info one by one.
* @since 2.3
*/
default Flux<XInfoGroup> xInfoGroups(ByteBuffer key) {
return xInfoGroups(Mono.just(XInfoCommand.xInfo().of(key))).next().flatMapMany(CommandResponse::getOutput);
}

/**
* Obtain general information about the stream stored at the specified {@literal key}.
*
* @param commands must not be {@literal null}.
* @return never {@literal null}.
* @since 2.3
*/
Flux<CommandResponse<XInfoCommand, Flux<XInfoGroup>>> xInfoGroups(Publisher<XInfoCommand> commands);

/**
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
* specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @param groupName name of the {@literal consumer group}.
* @return a {@link Flux} emitting consumer info one by one.
* @since 2.3
*/
default Flux<XInfoConsumer> xInfoConsumers(ByteBuffer key, String groupName) {
return xInfoConsumers(Mono.just(XInfoCommand.xInfo().of(key).consumersIn(groupName))).next()
.flatMapMany(CommandResponse::getOutput);
}

/**
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
* specified {@literal key}.
*
* @param commands must not be {@literal null}.
* @return never {@literal null}.
* @since 2.3
*/
Flux<CommandResponse<XInfoCommand, Flux<XInfoConsumer>>> xInfoConsumers(Publisher<XInfoCommand> commands);

class GroupCommand extends KeyCommand {

private final GroupCommandAction action;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -404,6 +407,39 @@ default Boolean xGroupDelConsumer(byte[] key, String groupName, String consumerN
@Nullable
Boolean xGroupDestroy(byte[] key, String groupName);

/**
* Obtain general information about the stream stored at the specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @return {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
@Nullable
XInfoStream xInfo(byte[] key);

/**
* Obtain information about {@literal consumer groups} associated with the stream stored at the specified
* {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @return {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
@Nullable
XInfoGroups xInfoGroups(byte[] key);

/**
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
* specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @param groupName name of the {@literal consumer group}.
* @return {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
@Nullable
XInfoConsumers xInfoConsumers(byte[] key, String groupName);

/**
* Get the length of a stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
Expand Down Expand Up @@ -2107,6 +2110,39 @@ default Long xDel(String key, String... entryIds) {
@Nullable
Boolean xGroupDestroy(String key, String group);

/**
* Obtain general information about the stream stored at the specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @return {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
@Nullable
XInfoStream xInfo(String key);

/**
* Obtain information about {@literal consumer groups} associated with the stream stored at the specified
* {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @return {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
@Nullable
XInfoGroups xInfoGroups(String key);

/**
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
* specified {@literal key}.
*
* @param key the {@literal key} the stream is stored at.
* @param groupName name of the {@literal consumer group}.
* @return {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
@Nullable
XInfoConsumers xInfoConsumers(String key, String groupName);

/**
* Get the length of a stream.
*
Expand Down
Loading