Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,18 @@
<configuration>
<configFile>${project.basedir}/hbase-formatter.xml</configFile>
<directories>
<directory>${project.basedir}/src/main/java/redis/clients/jedis/annots</directory>
<directory>${project.basedir}</directory>
</directories>
<includes>
<!-- Specific files -->
<include>src/main/java/redis/clients/jedis/annots/*.java</include>
<include>src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java</include>
<include>src/main/java/redis/clients/jedisargs/StreamDeletionPolicy.java</include>
<include>src/test/java/redis/clients/jedis/commands/StreamsCommandsTestBase.java</include>
<include>src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java</include>
<include>src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java</include>
<include>src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java</include>
</includes>
</configuration>
<executions>
<execution>
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,42 @@ public List<StreamEntryID> build(Object data) {
}
};

public static final Builder<StreamEntryDeletionResult> STREAM_ENTRY_DELETION_RESULT = new Builder<StreamEntryDeletionResult>() {
@Override
public StreamEntryDeletionResult build(Object data) {
if (data == null) {
return null;
}
return StreamEntryDeletionResult.fromLong((Long) data);
}

@Override
public String toString() {
return "StreamEntryDeletionResult";
}
};

public static final Builder<List<StreamEntryDeletionResult>> STREAM_ENTRY_DELETION_RESULT_LIST = new Builder<List<StreamEntryDeletionResult>>() {
@Override
@SuppressWarnings("unchecked")
public List<StreamEntryDeletionResult> build(Object data) {
if (data == null) {
return null;
}
List<Object> objectList = (List<Object>) data;
List<StreamEntryDeletionResult> responses = new ArrayList<>(objectList.size());
for (Object object : objectList) {
responses.add(STREAM_ENTRY_DELETION_RESULT.build(object));
}
return responses;
}

@Override
public String toString() {
return "List<StreamEntryDeletionResult>";
}
};

public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
@Override
@SuppressWarnings("unchecked")
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2626,10 +2626,26 @@ public final CommandObject<Long> xack(String key, String group, StreamEntryID...
return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<Long> xack(byte[] key, byte[] group, byte[]... ids) {
return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, byte[]... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<String> xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
CommandArguments args = commandArguments(XGROUP).add(CREATE).key(key)
.add(groupName).add(id == null ? "0-0" : id);
Expand Down Expand Up @@ -2687,6 +2703,14 @@ public final CommandObject<Long> xdel(String key, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamEntryDeletionResult>> xdelex(String key, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamEntryDeletionResult>> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<Long> xtrim(String key, long maxLen, boolean approximate) {
CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN);
if (approximate) args.add(Protocol.BYTES_TILDE);
Expand All @@ -2702,6 +2726,14 @@ public final CommandObject<Long> xdel(byte[] key, byte[]... ids) {
return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamEntryDeletionResult>> xdelex(byte[] key, byte[]... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamEntryDeletionResult>> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<Long> xtrim(byte[] key, long maxLen, boolean approximateLength) {
CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN);
if (approximateLength) args.add(Protocol.BYTES_TILDE);
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4866,6 +4866,18 @@ public long xack(byte[] key, byte[] group, byte[]... ids) {
return connection.executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -4902,6 +4914,18 @@ public long xdel(byte[] key, byte[]... ids) {
return connection.executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(byte[] key, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(byte[] key, long maxLen, boolean approximateLength) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -9677,6 +9701,18 @@ public long xack(final String key, final String group, final StreamEntryID... id
return connection.executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(final String key, final String group, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(final String key, final String group, final StreamDeletionPolicy trimMode, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(final String key, final String groupName, final StreamEntryID id,
final boolean makeStream) {
Expand Down Expand Up @@ -9714,6 +9750,18 @@ public long xdel(final String key, final StreamEntryID... ids) {
return connection.executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(final String key, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(final String key, final StreamDeletionPolicy trimMode, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(final String key, final long maxLen, final boolean approximateLength) {
checkIsInMultiOrPipeline();
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,16 @@ public Response<Long> xack(String key, String group, StreamEntryID... ids) {
return appendCommand(commandObjects.xack(key, group, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamEntryID... ids) {
return appendCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
return appendCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public Response<String> xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -1592,6 +1602,16 @@ public Response<Long> xdel(String key, StreamEntryID... ids) {
return appendCommand(commandObjects.xdel(key, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xdelex(String key, StreamEntryID... ids) {
return appendCommand(commandObjects.xdelex(key, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
return appendCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public Response<Long> xtrim(String key, long maxLen, boolean approximate) {
return appendCommand(commandObjects.xtrim(key, maxLen, approximate));
Expand Down Expand Up @@ -3264,6 +3284,16 @@ public Response<Long> xack(byte[] key, byte[] group, byte[]... ids) {
return appendCommand(commandObjects.xack(key, group, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, byte[]... ids) {
return appendCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
return appendCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public Response<String> xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) {
return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -3294,6 +3324,16 @@ public Response<Long> xdel(byte[] key, byte[]... ids) {
return appendCommand(commandObjects.xdel(key, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xdelex(byte[] key, byte[]... ids) {
return appendCommand(commandObjects.xdelex(key, ids));
}

@Override
public Response<List<StreamEntryDeletionResult>> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
return appendCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public Response<Long> xtrim(byte[] key, long maxLen, boolean approximateLength) {
return appendCommand(commandObjects.xtrim(key, maxLen, approximateLength));
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public static enum Command implements ProtocolCommand {
GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, // <-- geo
PFADD, PFCOUNT, PFMERGE, // <-- hyper log log
XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM,
XAUTOCLAIM, XINFO, // <-- stream
XAUTOCLAIM, XINFO, XDELEX, XACKDEL, // <-- stream
EVAL, EVALSHA, SCRIPT, EVAL_RO, EVALSHA_RO, FUNCTION, FCALL, FCALL_RO, // <-- program
SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB,
SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, // <-- pub sub
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3201,6 +3201,16 @@ public long xack(String key, String group, StreamEntryID... ids) {
return executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(String key, String group, StreamEntryID... ids) {
return executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
return executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -3241,6 +3251,16 @@ public long xdel(String key, StreamEntryID... ids) {
return executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(String key, StreamEntryID... ids) {
return executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
return executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(String key, long maxLen, boolean approximate) {
return executeCommand(commandObjects.xtrim(key, maxLen, approximate));
Expand Down Expand Up @@ -3356,6 +3376,16 @@ public long xack(byte[] key, byte[] group, byte[]... ids) {
return executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, byte[]... ids) {
return executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
return executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) {
return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -3386,6 +3416,16 @@ public long xdel(byte[] key, byte[]... ids) {
return executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(byte[] key, byte[]... ids) {
return executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamEntryDeletionResult> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
return executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(byte[] key, long maxLen, boolean approximateLength) {
return executeCommand(commandObjects.xtrim(key, maxLen, approximateLength));
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/args/StreamDeletionPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package redis.clients.jedis.args;

import redis.clients.jedis.util.SafeEncoder;

/**
* Deletion policy for stream commands that handle consumer group references. Used with XDELEX,
* XACKDEL, and enhanced XADD/XTRIM commands.
*/
public enum StreamDeletionPolicy implements Rawable {

/**
* Preserves existing references to entries in all consumer groups' PEL. This is the default
* behavior similar to XDEL.
*/
KEEP_REFERENCES("KEEPREF"),

/**
* Removes all references to entries from all consumer groups' pending entry lists, effectively
* cleaning up all traces of the messages.
*/
DELETE_REFERENCES("DELREF"),

/**
* Only operates on entries that were read and acknowledged by all consumer groups.
*/
ACKNOWLEDGED("ACKED");

private final byte[] raw;

StreamDeletionPolicy(String redisParamName) {
raw = SafeEncoder.encode(redisParamName);
}

@Override
public byte[] getRaw() {
return raw;
}
}
Loading
Loading