diff --git a/pom.xml b/pom.xml index 22ea1cdce7..7462072032 100644 --- a/pom.xml +++ b/pom.xml @@ -335,8 +335,18 @@ ${project.basedir}/hbase-formatter.xml - ${project.basedir}/src/main/java/redis/clients/jedis/annots + ${project.basedir} + + + src/main/java/redis/clients/jedis/annots/*.java + src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java + src/main/java/redis/clients/jedisargs/StreamDeletionPolicy.java + src/test/java/redis/clients/jedis/commands/StreamsCommandsTestBase.java + src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java + src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java + src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java + diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 15bb0122cd..585d4bfd29 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1261,6 +1261,42 @@ public List build(Object data) { } }; + public static final Builder STREAM_ENTRY_DELETION_RESULT = new Builder() { + @Override + public StreamEntryDeletionResult build(Object data) { + if (data == null) { + return null; + } + return StreamEntryDeletionResult.fromLong((Long) data); + } + + @Override + public String toString() { + return "StreamEntryDeletionResult"; + } + }; + + public static final Builder> STREAM_ENTRY_DELETION_RESULT_LIST = new Builder>() { + @Override + @SuppressWarnings("unchecked") + public List build(Object data) { + if (data == null) { + return null; + } + List objectList = (List) data; + List responses = new ArrayList<>(objectList.size()); + for (Object object : objectList) { + responses.add(STREAM_ENTRY_DELETION_RESULT.build(object)); + } + return responses; + } + + @Override + public String toString() { + return "List"; + } + }; + public static final Builder STREAM_ENTRY = new Builder() { @Override @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index ea4930f894..5533ecf1f5 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2626,10 +2626,26 @@ public final CommandObject xack(String key, String group, StreamEntryID... return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG); } + public final CommandObject> xackdel(String key, String group, StreamEntryID... ids) { + return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + + public final CommandObject> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) { + return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + public final CommandObject xack(byte[] key, byte[] group, byte[]... ids) { return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG); } + public final CommandObject> xackdel(byte[] key, byte[] group, byte[]... ids) { + return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + + public final CommandObject> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) { + return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + public final CommandObject xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) { CommandArguments args = commandArguments(XGROUP).add(CREATE).key(key) .add(groupName).add(id == null ? "0-0" : id); @@ -2687,6 +2703,14 @@ public final CommandObject xdel(String key, StreamEntryID... ids) { return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG); } + public final CommandObject> xdelex(String key, StreamEntryID... ids) { + return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + + public final CommandObject> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) { + return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + public final CommandObject xtrim(String key, long maxLen, boolean approximate) { CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN); if (approximate) args.add(Protocol.BYTES_TILDE); @@ -2702,6 +2726,14 @@ public final CommandObject xdel(byte[] key, byte[]... ids) { return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG); } + public final CommandObject> xdelex(byte[] key, byte[]... ids) { + return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + + public final CommandObject> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) { + return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST); + } + public final CommandObject xtrim(byte[] key, long maxLen, boolean approximateLength) { CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN); if (approximateLength) args.add(Protocol.BYTES_TILDE); diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index e19a4fa619..6d50b8814c 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -4866,6 +4866,18 @@ public long xack(byte[] key, byte[] group, byte[]... ids) { return connection.executeCommand(commandObjects.xack(key, group, ids)); } + @Override + public List xackdel(byte[] key, byte[] group, byte[]... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xackdel(key, group, ids)); + } + + @Override + public List xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids)); + } + @Override public String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream) { checkIsInMultiOrPipeline(); @@ -4902,6 +4914,18 @@ public long xdel(byte[] key, byte[]... ids) { return connection.executeCommand(commandObjects.xdel(key, ids)); } + @Override + public List xdelex(byte[] key, byte[]... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xdelex(key, ids)); + } + + @Override + public List xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids)); + } + @Override public long xtrim(byte[] key, long maxLen, boolean approximateLength) { checkIsInMultiOrPipeline(); @@ -9677,6 +9701,18 @@ public long xack(final String key, final String group, final StreamEntryID... id return connection.executeCommand(commandObjects.xack(key, group, ids)); } + @Override + public List xackdel(final String key, final String group, final StreamEntryID... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xackdel(key, group, ids)); + } + + @Override + public List xackdel(final String key, final String group, final StreamDeletionPolicy trimMode, final StreamEntryID... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids)); + } + @Override public String xgroupCreate(final String key, final String groupName, final StreamEntryID id, final boolean makeStream) { @@ -9714,6 +9750,18 @@ public long xdel(final String key, final StreamEntryID... ids) { return connection.executeCommand(commandObjects.xdel(key, ids)); } + @Override + public List xdelex(final String key, final StreamEntryID... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xdelex(key, ids)); + } + + @Override + public List xdelex(final String key, final StreamDeletionPolicy trimMode, final StreamEntryID... ids) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids)); + } + @Override public long xtrim(final String key, final long maxLen, final boolean approximateLength) { checkIsInMultiOrPipeline(); diff --git a/src/main/java/redis/clients/jedis/PipeliningBase.java b/src/main/java/redis/clients/jedis/PipeliningBase.java index f7b974f552..7274e8c7b3 100644 --- a/src/main/java/redis/clients/jedis/PipeliningBase.java +++ b/src/main/java/redis/clients/jedis/PipeliningBase.java @@ -1552,6 +1552,16 @@ public Response xack(String key, String group, StreamEntryID... ids) { return appendCommand(commandObjects.xack(key, group, ids)); } + @Override + public Response> xackdel(String key, String group, StreamEntryID... ids) { + return appendCommand(commandObjects.xackdel(key, group, ids)); + } + + @Override + public Response> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) { + return appendCommand(commandObjects.xackdel(key, group, trimMode, ids)); + } + @Override public Response xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) { return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream)); @@ -1592,6 +1602,16 @@ public Response xdel(String key, StreamEntryID... ids) { return appendCommand(commandObjects.xdel(key, ids)); } + @Override + public Response> xdelex(String key, StreamEntryID... ids) { + return appendCommand(commandObjects.xdelex(key, ids)); + } + + @Override + public Response> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) { + return appendCommand(commandObjects.xdelex(key, trimMode, ids)); + } + @Override public Response xtrim(String key, long maxLen, boolean approximate) { return appendCommand(commandObjects.xtrim(key, maxLen, approximate)); @@ -3264,6 +3284,16 @@ public Response xack(byte[] key, byte[] group, byte[]... ids) { return appendCommand(commandObjects.xack(key, group, ids)); } + @Override + public Response> xackdel(byte[] key, byte[] group, byte[]... ids) { + return appendCommand(commandObjects.xackdel(key, group, ids)); + } + + @Override + public Response> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) { + return appendCommand(commandObjects.xackdel(key, group, trimMode, ids)); + } + @Override public Response xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) { return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream)); @@ -3294,6 +3324,16 @@ public Response xdel(byte[] key, byte[]... ids) { return appendCommand(commandObjects.xdel(key, ids)); } + @Override + public Response> xdelex(byte[] key, byte[]... ids) { + return appendCommand(commandObjects.xdelex(key, ids)); + } + + @Override + public Response> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) { + return appendCommand(commandObjects.xdelex(key, trimMode, ids)); + } + @Override public Response xtrim(byte[] key, long maxLen, boolean approximateLength) { return appendCommand(commandObjects.xtrim(key, maxLen, approximateLength)); diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 6d59a8b913..226702cd9f 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -305,7 +305,7 @@ public static enum Command implements ProtocolCommand { GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, // <-- geo PFADD, PFCOUNT, PFMERGE, // <-- hyper log log XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM, - XAUTOCLAIM, XINFO, // <-- stream + XAUTOCLAIM, XINFO, XDELEX, XACKDEL, // <-- stream EVAL, EVALSHA, SCRIPT, EVAL_RO, EVALSHA_RO, FUNCTION, FCALL, FCALL_RO, // <-- program SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB, SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, // <-- pub sub diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index e3960862fa..e0522f1eda 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -3201,6 +3201,16 @@ public long xack(String key, String group, StreamEntryID... ids) { return executeCommand(commandObjects.xack(key, group, ids)); } + @Override + public List xackdel(String key, String group, StreamEntryID... ids) { + return executeCommand(commandObjects.xackdel(key, group, ids)); + } + + @Override + public List xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) { + return executeCommand(commandObjects.xackdel(key, group, trimMode, ids)); + } + @Override public String xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) { return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream)); @@ -3241,6 +3251,16 @@ public long xdel(String key, StreamEntryID... ids) { return executeCommand(commandObjects.xdel(key, ids)); } + @Override + public List xdelex(String key, StreamEntryID... ids) { + return executeCommand(commandObjects.xdelex(key, ids)); + } + + @Override + public List xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) { + return executeCommand(commandObjects.xdelex(key, trimMode, ids)); + } + @Override public long xtrim(String key, long maxLen, boolean approximate) { return executeCommand(commandObjects.xtrim(key, maxLen, approximate)); @@ -3356,6 +3376,16 @@ public long xack(byte[] key, byte[] group, byte[]... ids) { return executeCommand(commandObjects.xack(key, group, ids)); } + @Override + public List xackdel(byte[] key, byte[] group, byte[]... ids) { + return executeCommand(commandObjects.xackdel(key, group, ids)); + } + + @Override + public List xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) { + return executeCommand(commandObjects.xackdel(key, group, trimMode, ids)); + } + @Override public String xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) { return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream)); @@ -3386,6 +3416,16 @@ public long xdel(byte[] key, byte[]... ids) { return executeCommand(commandObjects.xdel(key, ids)); } + @Override + public List xdelex(byte[] key, byte[]... ids) { + return executeCommand(commandObjects.xdelex(key, ids)); + } + + @Override + public List xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) { + return executeCommand(commandObjects.xdelex(key, trimMode, ids)); + } + @Override public long xtrim(byte[] key, long maxLen, boolean approximateLength) { return executeCommand(commandObjects.xtrim(key, maxLen, approximateLength)); diff --git a/src/main/java/redis/clients/jedis/args/StreamDeletionPolicy.java b/src/main/java/redis/clients/jedis/args/StreamDeletionPolicy.java new file mode 100644 index 0000000000..640c71dea9 --- /dev/null +++ b/src/main/java/redis/clients/jedis/args/StreamDeletionPolicy.java @@ -0,0 +1,38 @@ +package redis.clients.jedis.args; + +import redis.clients.jedis.util.SafeEncoder; + +/** + * Deletion policy for stream commands that handle consumer group references. Used with XDELEX, + * XACKDEL, and enhanced XADD/XTRIM commands. + */ +public enum StreamDeletionPolicy implements Rawable { + + /** + * Preserves existing references to entries in all consumer groups' PEL. This is the default + * behavior similar to XDEL. + */ + KEEP_REFERENCES("KEEPREF"), + + /** + * Removes all references to entries from all consumer groups' pending entry lists, effectively + * cleaning up all traces of the messages. + */ + DELETE_REFERENCES("DELREF"), + + /** + * Only operates on entries that were read and acknowledged by all consumer groups. + */ + ACKNOWLEDGED("ACKED"); + + private final byte[] raw; + + StreamDeletionPolicy(String redisParamName) { + raw = SafeEncoder.encode(redisParamName); + } + + @Override + public byte[] getRaw() { + return raw; + } +} diff --git a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java index 5db025ef2d..c2d3fcbb2e 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java @@ -4,8 +4,10 @@ import java.util.Map; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.params.*; import redis.clients.jedis.resps.StreamEntryBinary; +import redis.clients.jedis.resps.StreamEntryDeletionResult; public interface StreamBinaryCommands { @@ -27,6 +29,16 @@ default byte[] xadd(byte[] key, Map hash, XAddParams params) { long xack(byte[] key, byte[] group, byte[]... ids); + /** + * XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + List xackdel(byte[] key, byte[] group, byte[]... ids); + + /** + * XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + List xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids); + String xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream); String xgroupSetID(byte[] key, byte[] groupName, byte[] id); @@ -39,6 +51,16 @@ default byte[] xadd(byte[] key, Map hash, XAddParams params) { long xdel(byte[] key, byte[]... ids); + /** + * XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + List xdelex(byte[] key, byte[]... ids); + + /** + * XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + List xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids); + long xtrim(byte[] key, long maxLen, boolean approximateLength); long xtrim(byte[] key, XTrimParams params); diff --git a/src/main/java/redis/clients/jedis/commands/StreamCommands.java b/src/main/java/redis/clients/jedis/commands/StreamCommands.java index 163e11050e..0a61e34811 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamCommands.java @@ -4,6 +4,7 @@ import java.util.Map; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.params.*; import redis.clients.jedis.resps.*; @@ -98,6 +99,20 @@ default StreamEntryID xadd(String key, Map hash, XAddParams para */ long xack(String key, String group, StreamEntryID... ids); + /** + * XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + * Combines XACK and XDEL functionalities. Acknowledges specified message IDs + * in the given consumer group and attempts to delete corresponding stream entries. + */ + List xackdel(String key, String group, StreamEntryID... ids); + + /** + * XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + * Combines XACK and XDEL functionalities. Acknowledges specified message IDs + * in the given consumer group and attempts to delete corresponding stream entries. + */ + List xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids); + /** * {@code XGROUP CREATE key groupName } */ @@ -128,6 +143,20 @@ default StreamEntryID xadd(String key, Map hash, XAddParams para */ long xdel(String key, StreamEntryID... ids); + /** + * XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + * Extended XDEL command with enhanced control over message entry deletion + * with respect to consumer groups. + */ + List xdelex(String key, StreamEntryID... ids); + + /** + * XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + * Extended XDEL command with enhanced control over message entry deletion + * with respect to consumer groups. + */ + List xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids); + /** * XTRIM key MAXLEN [~] count */ diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java index 3cda8f079a..bd6b36b1b0 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java @@ -5,8 +5,10 @@ import redis.clients.jedis.Response; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.params.*; import redis.clients.jedis.resps.StreamEntryBinary; +import redis.clients.jedis.resps.StreamEntryDeletionResult; public interface StreamPipelineBinaryCommands { @@ -28,6 +30,10 @@ default Response xadd(byte[] key, Map hash, XAddParams p Response xack(byte[] key, byte[] group, byte[]... ids); + Response> xackdel(byte[] key, byte[] group, byte[]... ids); + + Response> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids); + Response xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream); Response xgroupSetID(byte[] key, byte[] groupName, byte[] id); @@ -40,6 +46,10 @@ default Response xadd(byte[] key, Map hash, XAddParams p Response xdel(byte[] key, byte[]... ids); + Response> xdelex(byte[] key, byte[]... ids); + + Response> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids); + Response xtrim(byte[] key, long maxLen, boolean approximateLength); Response xtrim(byte[] key, XTrimParams params); diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java index d4bda0fb98..e3346363d4 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java @@ -5,6 +5,7 @@ import redis.clients.jedis.Response; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.params.*; import redis.clients.jedis.resps.*; @@ -91,6 +92,16 @@ default Response xadd(String key, Map hash, XAddP */ Response xack(String key, String group, StreamEntryID... ids); + /** + * XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + Response> xackdel(String key, String group, StreamEntryID... ids); + + /** + * XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + Response> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids); + /** * {@code XGROUP CREATE key groupName } */ @@ -131,6 +142,16 @@ default Response xadd(String key, Map hash, XAddP */ Response xdel(String key, StreamEntryID... ids); + /** + * XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + Response> xdelex(String key, StreamEntryID... ids); + + /** + * XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] + */ + Response> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids); + /** * XTRIM key MAXLEN [~] count */ diff --git a/src/main/java/redis/clients/jedis/params/XAddParams.java b/src/main/java/redis/clients/jedis/params/XAddParams.java index 3575c6e5d2..db6c4deb3a 100644 --- a/src/main/java/redis/clients/jedis/params/XAddParams.java +++ b/src/main/java/redis/clients/jedis/params/XAddParams.java @@ -6,6 +6,7 @@ import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.args.Rawable; import redis.clients.jedis.args.RawableFactory; +import redis.clients.jedis.args.StreamDeletionPolicy; import java.util.Objects; @@ -25,6 +26,8 @@ public class XAddParams implements IParams { private Long limit; + private StreamDeletionPolicy trimMode; + public static XAddParams xAddParams() { return new XAddParams(); } @@ -81,6 +84,17 @@ public XAddParams limit(long limit) { return this; } + /** + * When trimming, defines desired behaviour for handling consumer group references. + * see {@link StreamDeletionPolicy} for details. + * + * @return XAddParams + */ + public XAddParams trimmingMode(StreamDeletionPolicy trimMode) { + this.trimMode = trimMode; + return this; + } + @Override public void addParams(CommandArguments args) { @@ -114,6 +128,10 @@ public void addParams(CommandArguments args) { args.add(Keyword.LIMIT).add(limit); } + if (trimMode != null) { + args.add(trimMode); + } + args.add(id != null ? id : StreamEntryID.NEW_ENTRY); } @@ -122,11 +140,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; XAddParams that = (XAddParams) o; - return approximateTrimming == that.approximateTrimming && exactTrimming == that.exactTrimming && nomkstream == that.nomkstream && Objects.equals(id, that.id) && Objects.equals(maxLen, that.maxLen) && Objects.equals(minId, that.minId) && Objects.equals(limit, that.limit); + return approximateTrimming == that.approximateTrimming && exactTrimming == that.exactTrimming && nomkstream == that.nomkstream && Objects.equals(id, that.id) && Objects.equals(maxLen, that.maxLen) && Objects.equals(minId, that.minId) && Objects.equals(limit, that.limit) && trimMode == that.trimMode; } @Override public int hashCode() { - return Objects.hash(id, maxLen, approximateTrimming, exactTrimming, nomkstream, minId, limit); + return Objects.hash(id, maxLen, approximateTrimming, exactTrimming, nomkstream, minId, limit, + trimMode); } } diff --git a/src/main/java/redis/clients/jedis/params/XTrimParams.java b/src/main/java/redis/clients/jedis/params/XTrimParams.java index af77ba81b3..a6c4e8d148 100644 --- a/src/main/java/redis/clients/jedis/params/XTrimParams.java +++ b/src/main/java/redis/clients/jedis/params/XTrimParams.java @@ -3,6 +3,7 @@ import redis.clients.jedis.CommandArguments; import redis.clients.jedis.Protocol; import redis.clients.jedis.Protocol.Keyword; +import redis.clients.jedis.args.StreamDeletionPolicy; import java.util.Objects; @@ -18,6 +19,8 @@ public class XTrimParams implements IParams { private Long limit; + private StreamDeletionPolicy trimMode; + public static XTrimParams xTrimParams() { return new XTrimParams(); } @@ -48,6 +51,17 @@ public XTrimParams limit(long limit) { return this; } + /** + * Defines desired behaviour for handling consumer group references. + * see {@link StreamDeletionPolicy} for details. + * + * @return XAddParams + */ + public XTrimParams trimmingMode(StreamDeletionPolicy trimMode) { + this.trimMode = trimMode; + return this; + } + @Override public void addParams(CommandArguments args) { if (maxLen != null) { @@ -75,6 +89,10 @@ public void addParams(CommandArguments args) { if (limit != null) { args.add(Keyword.LIMIT).add(limit); } + + if (trimMode != null) { + args.add(trimMode); + } } @Override @@ -82,11 +100,11 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; XTrimParams that = (XTrimParams) o; - return approximateTrimming == that.approximateTrimming && exactTrimming == that.exactTrimming && Objects.equals(maxLen, that.maxLen) && Objects.equals(minId, that.minId) && Objects.equals(limit, that.limit); + return approximateTrimming == that.approximateTrimming && exactTrimming == that.exactTrimming && Objects.equals(maxLen, that.maxLen) && Objects.equals(minId, that.minId) && Objects.equals(limit, that.limit) && trimMode == that.trimMode; } @Override public int hashCode() { - return Objects.hash(maxLen, approximateTrimming, exactTrimming, minId, limit); + return Objects.hash(maxLen, approximateTrimming, exactTrimming, minId, limit, trimMode); } } diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java b/src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java new file mode 100644 index 0000000000..974f8f3476 --- /dev/null +++ b/src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java @@ -0,0 +1,86 @@ +package redis.clients.jedis.resps; + +/** + * Represents the result of a stream entry deletion operation for XDELEX and XACKDEL commands. + *
    + *
  • NOT_FOUND (-1): ID doesn't exist in stream
  • + *
  • DELETED (1): Entry was deleted/acknowledged and deleted
  • + *
  • ACKNOWLEDGED_NOT_DELETED (2): Entry was acknowledged but not deleted (still has dangling + * references)
  • + *
+ */ +public enum StreamEntryDeletionResult { + + /** + * The stream entry ID doesn't exist in the stream. + *

+ * Returned when trying to delete/acknowledge a non-existent entry. + *

+ */ + NOT_FOUND(-1), + + /** + * The entry was successfully deleted/acknowledged and deleted. + *

+ * This is the typical successful case. + *

+ */ + DELETED(1), + + /** + * The entry was acknowledged but not deleted because it still has dangling references in other + * consumer groups' pending entry lists. + */ + ACKNOWLEDGED_NOT_DELETED(2); + + private final int code; + + StreamEntryDeletionResult(int code) { + this.code = code; + } + + /** + * Gets the numeric code returned by Redis for this result. + * @return the numeric code (-1, 1, or 2) + */ + public int getCode() { + return code; + } + + /** + * Creates a StreamEntryDeletionResult from the numeric code returned by Redis. + * @param code the numeric code from Redis + * @return the corresponding StreamEntryDeletionResult + * @throws IllegalArgumentException if the code is not recognized + */ + public static StreamEntryDeletionResult fromCode(int code) { + switch (code) { + case -1: + return NOT_FOUND; + case 1: + return DELETED; + case 2: + return ACKNOWLEDGED_NOT_DELETED; + default: + throw new IllegalArgumentException("Unknown stream entry deletion result code: " + code); + } + } + + /** + * Creates a StreamEntryDeletionResult from a Long value returned by Redis. + * @param value the Long value from Redis + * @return the corresponding StreamEntryDeletionResult + * @throws IllegalArgumentException if the value is null or not recognized + */ + public static StreamEntryDeletionResult fromLong(Long value) { + if (value == null) { + throw new IllegalArgumentException("Stream entry deletion result value cannot be null"); + } + return fromCode(value.intValue()); + } + + @Override + public String toString() { + return name() + "(" + code + ")"; + } +} diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java index e53b4d87ba..63ba4111df 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java @@ -1,16 +1,20 @@ package redis.clients.jedis.commands.jedis; +import io.redis.test.annotations.SinceRedisVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; import redis.clients.jedis.RedisProtocol; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.params.XTrimParams; import redis.clients.jedis.resps.StreamEntryBinary; +import redis.clients.jedis.resps.StreamEntryDeletionResult; import java.util.ArrayList; import java.util.HashMap; @@ -21,7 +25,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static redis.clients.jedis.StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY; import static redis.clients.jedis.util.StreamEntryBinaryListMatcher.equalsStreamEntries; @@ -253,4 +260,269 @@ public void xreadGroupBinaryAsMapMultipleStreams() { assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); } + // ========== XACKDEL Command Tests ========== + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdel() { + // Add a message to the stream + byte[] messageId = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + assertNotNull(messageId); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Read the message with consumer group to add it to PEL + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(1), streams); + + assertEquals(1, messages.size()); + assertEquals(1, messages.get(0).getValue().size()); + byte[] readMessageId = messages.get(0).getValue().get(0).getID().toString().getBytes(); + + // Test XACKDEL - should acknowledge and delete the message + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, readMessageId); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify message is deleted from stream + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdelWithTrimMode() { + // Add multiple messages + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Read the messages with consumer group + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Test XACKDEL with KEEP_REFERENCES mode + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, StreamDeletionPolicy.KEEP_REFERENCES, readId1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify one message is deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdelUnreadMessages() { + // Add test entries but don't read them + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + + // Test XACKDEL on unread messages - should return NOT_FOUND for PEL + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, id1); + + assertThat(results, hasSize(1)); + // Should return NOT_FOUND because message was never read by the consumer group + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0)); + + // Stream should still contain the message + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdelMultipleMessages() { + // Add multiple messages + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Read the messages with consumer group + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + assertEquals(1, messages.size()); + assertEquals(3, messages.get(0).getValue().size()); + + // Test XACKDEL with multiple IDs + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + byte[] readId2 = messages.get(0).getValue().get(1).getID().toString().getBytes(); + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, readId1, readId2); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(1)); + + // Verify two messages are deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + // ========== XDELEX Command Tests ========== + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelex() { + // Add test entries + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + byte[] id2 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Test basic XDELEX without parameters (should behave like XDEL with KEEP_REFERENCES) + List results = jedis.xdelex(STREAM_KEY_1, id1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify entry is deleted from stream + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexWithTrimMode() { + // Add test entries + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + + // Test XDELEX with DELETE_REFERENCES mode + List results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.DELETE_REFERENCES, id1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify entry is deleted from stream + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexMultipleEntries() { + // Add test entries + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + byte[] id3 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Test XDELEX with multiple IDs + List results = jedis.xdelex(STREAM_KEY_1, id1, id3); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(1)); + + // Verify two entries are deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexNonExistentEntries() { + // Add one entry + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Test XDELEX with mix of existing and non-existent IDs + byte[] nonExistentId = "999-0".getBytes(); + List results = jedis.xdelex(STREAM_KEY_1, id1, nonExistentId); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // Existing entry + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(1)); // Non-existent entry + + // Verify existing entry is deleted + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexWithConsumerGroups() { + // Add test entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Read messages with consumer group to add them to PEL + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Acknowledge only the first message + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + byte[] readId2 = messages.get(0).getValue().get(1).getID().toString().getBytes(); + jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1); + + // Test XDELEX with ACKNOWLEDGED mode - should only delete acknowledged entries + List results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged + assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not acknowledged + + // Verify only acknowledged entry was deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexEmptyStream() { + // Test XDELEX on empty stream + byte[] nonExistentId = "1-0".getBytes(); + List results = jedis.xdelex(STREAM_KEY_1, nonExistentId); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0)); + } + + // ========== XTRIM Command Tests with trimmingMode ========== + + @Test + @SinceRedisVersion("8.1.240") + public void testXtrimWithKeepReferences() { + // Add test entries + for (int i = 1; i <= 5; i++) { + jedis.xadd(STREAM_KEY_1, new XAddParams().id(i + "-0"), HASH_1); + } + assertEquals(5L, jedis.xlen(STREAM_KEY_1)); + + // Read messages with consumer group to create PEL entries + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Test XTRIM with KEEP_REFERENCES mode - should preserve PEL references + long trimmed = jedis.xtrim(STREAM_KEY_1, XTrimParams.xTrimParams().maxLen(3).trimmingMode( + StreamDeletionPolicy.KEEP_REFERENCES)); + assertEquals(2L, trimmed); // Should trim 2 entries + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXtrimWithAcknowledged() { + // Add test entries + for (int i = 1; i <= 5; i++) { + jedis.xadd(STREAM_KEY_1, new XAddParams().id(i + "-0"), HASH_1); + } + assertEquals(5L, jedis.xlen(STREAM_KEY_1)); + + // Read messages with consumer group + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + assertEquals(1, messages.size()); + assertEquals(3, messages.get(0).getValue().size()); + + // Acknowledge only the first 2 messages + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + byte[] readId2 = messages.get(0).getValue().get(1).getID().toString().getBytes(); + jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1, readId2); + + // Test XTRIM with ACKNOWLEDGED mode - should only trim acknowledged entries + long trimmed = jedis.xtrim(STREAM_KEY_1, XTrimParams.xTrimParams().maxLen(3).trimmingMode( + StreamDeletionPolicy.ACKNOWLEDGED)); + // The exact behavior depends on implementation, but it should respect acknowledgment status + assertTrue(trimmed >= 0); + assertTrue(jedis.xlen(STREAM_KEY_1) <= 5); // Should not exceed original length + } + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 1a3763b259..72322e6988 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -36,6 +36,7 @@ import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.params.*; import redis.clients.jedis.resps.*; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.util.RedisVersionUtil; import redis.clients.jedis.util.SafeEncoder; @@ -197,6 +198,316 @@ public void xaddParamsId() { assertNotNull(id); } + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithTrimmingModeKeepReferences() { + String streamKey = "xadd-trim-keep-ref-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries to the stream + for (int i = 1; i <= 5; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(5L, jedis.xlen(streamKey)); + + // Create consumer group and read messages to create PEL entries + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Verify PEL has entries + List pendingBefore = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Add new entry with maxLen=3 and KEEP_REFERENCES mode + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("6-0")) + .maxLen(3) + .trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), map); + assertNotNull(newId); + + // Stream should be trimmed to 3 entries + assertEquals(3L, jedis.xlen(streamKey)); + + // PEL references should be preserved even though entries were trimmed + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingAfter.size()); // PEL entries should still exist + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithTrimmingModeDeleteReferences() { + String streamKey = "xadd-trim-del-ref-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries to the stream + for (int i = 1; i <= 5; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(5L, jedis.xlen(streamKey)); + + // Create consumer group and read messages to create PEL entries + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Verify PEL has entries + List pendingBefore = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Add new entry with maxLen=3 and DELETE_REFERENCES mode + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("6-0")) + .maxLen(3) + .trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES), map); + assertNotNull(newId); + + // Stream should be trimmed to 3 entries + assertEquals(3L, jedis.xlen(streamKey)); + + // PEL references should be removed for trimmed entries + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + // Only entries that still exist in the stream should remain in PEL + assertTrue(pendingAfter.size() <= 3); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithTrimmingModeAcknowledged() { + String streamKey = "xadd-trim-acked-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries to the stream + for (int i = 1; i <= 5; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(5L, jedis.xlen(streamKey)); + + // Create consumer group and read messages + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(groupName, consumerName, + XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Acknowledge the first 2 messages + StreamEntryID id1 = messages.get(0).getValue().get(0).getID(); + StreamEntryID id2 = messages.get(0).getValue().get(1).getID(); + jedis.xack(streamKey, groupName, id1, id2); + + // Verify PEL state + List pendingBefore = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(1, pendingBefore.size()); // Only 1 unacknowledged message + + // Add new entry with maxLen=3 and ACKNOWLEDGED mode + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("6-0")) + .maxLen(3) + .trimmingMode(StreamDeletionPolicy.ACKNOWLEDGED), map); + assertNotNull(newId); + + // Stream length should respect acknowledgment status + long streamLen = jedis.xlen(streamKey); + assertTrue(streamLen >= 3); // Should not trim unacknowledged entries aggressively + + // PEL should still contain unacknowledged entries + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertFalse(pendingAfter.isEmpty()); // Unacknowledged entries should remain + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithMinIdTrimmingModeKeepReferences() { + String streamKey = "xadd-minid-keep-ref-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries with specific IDs + for (int i = 1; i <= 5; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID("0-" + i)), map); + } + assertEquals(5L, jedis.xlen(streamKey)); + + // Create consumer group and read messages to create PEL entries + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Verify PEL has entries + List pendingBefore = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Add new entry with minId="0-3" and KEEP_REFERENCES mode (should trim entries < 0-3) + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("0-6")) + .minId("0-3") + .trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), map); + assertNotNull(newId); + + // Stream should have entries >= 0-3 plus the new entry + long streamLen = jedis.xlen(streamKey); + assertTrue(streamLen >= 3); // Should keep entries 0-3, 0-4, 0-5, 0-6 + + // PEL references should be preserved even for trimmed entries + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingAfter.size()); // PEL entries should still exist + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithMinIdTrimmingModeDeleteReferences() { + String streamKey = "xadd-minid-del-ref-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries with specific IDs + for (int i = 1; i <= 5; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID("0-" + i)), map); + } + assertEquals(5L, jedis.xlen(streamKey)); + + // Create consumer group and read messages to create PEL entries + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Verify PEL has entries + List pendingBefore = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Add new entry with minId="0-3" and DELETE_REFERENCES mode + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("0-6")) + .minId("0-3") + .trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES), map); + assertNotNull(newId); + + // Stream should have entries >= 0-3 plus the new entry + long streamLen = jedis.xlen(streamKey); + assertTrue(streamLen >= 3); + + // PEL references should be removed for trimmed entries (0-1, 0-2) + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + // Only entries that still exist in the stream should remain in PEL + assertTrue(pendingAfter.size() <= pendingBefore.size()); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithApproximateTrimmingAndTrimmingMode() { + String streamKey = "xadd-approx-trim-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries + for (int i = 1; i <= 10; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(10L, jedis.xlen(streamKey)); + + // Create consumer group and read messages + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(5), streamQuery); + + // Add new entry with approximate trimming and KEEP_REFERENCES mode + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("11-0")) + .maxLen(5) + .approximateTrimming() + .trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), map); + assertNotNull(newId); + + // With approximate trimming, the exact length may vary but should be around the target + long streamLen = jedis.xlen(streamKey); + assertTrue(streamLen >= 5); // Should be approximately 5, but may be more due to approximation + + // PEL should preserve references + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(5, pendingAfter.size()); // All read messages should remain in PEL + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithExactTrimmingAndTrimmingMode() { + String streamKey = "xadd-exact-trim-mode-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries + for (int i = 1; i <= 5; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(5L, jedis.xlen(streamKey)); + + // Create consumer group and read messages + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Add new entry with exact trimming and DELETE_REFERENCES mode + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("6-0")) + .maxLen(3) + .exactTrimming() + .trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES), map); + assertNotNull(newId); + + // With exact trimming, stream should be exactly 3 entries + assertEquals(3L, jedis.xlen(streamKey)); + + // PEL references should be cleaned up for trimmed entries + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + // Only entries that still exist in the stream should remain in PEL + assertTrue(pendingAfter.size() <= 3); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithLimitAndTrimmingMode() { + String streamKey = "xadd-limit-trim-mode-stream"; + String groupName = "test-group"; + String consumerName = "test-consumer"; + Map map = singletonMap("field", "value"); + + // Add initial entries + for (int i = 1; i <= 10; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(10L, jedis.xlen(streamKey)); + + // Create consumer group and read messages + jedis.xgroupCreate(streamKey, groupName, new StreamEntryID("0-0"), false); + Map streamQuery = singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(5), streamQuery); + + // Add new entry with limit and KEEP_REFERENCES mode (limit requires approximate trimming) + StreamEntryID newId = jedis.xadd(streamKey, XAddParams.xAddParams() + .id(new StreamEntryID("11-0")) + .maxLen(5) + .approximateTrimming() // Required for limit to work + .limit(2) // Limit the number of entries to examine for trimming + .trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), map); + assertNotNull(newId); + + // With limit, trimming may be less aggressive + long streamLen = jedis.xlen(streamKey); + assertTrue(streamLen >= 5); // Should be at least 5, but may be more due to limit + + // PEL should preserve references + List pendingAfter = jedis.xpending(streamKey, groupName, XPendingParams.xPendingParams().count(10)); + assertEquals(5, pendingAfter.size()); // All read messages should remain in PEL + } + @Test public void xdel() { Map map1 = new HashMap<>(); diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java index 816dd8e6ac..2903a70c3b 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java @@ -1,14 +1,18 @@ package redis.clients.jedis.commands.unified; +import io.redis.test.annotations.SinceRedisVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import redis.clients.jedis.RedisProtocol; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.params.XTrimParams; import redis.clients.jedis.resps.StreamEntryBinary; +import redis.clients.jedis.resps.StreamEntryDeletionResult; import java.util.ArrayList; import java.util.HashMap; @@ -19,7 +23,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static redis.clients.jedis.StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY; import static redis.clients.jedis.util.StreamEntryBinaryListMatcher.equalsStreamEntries; @@ -252,4 +259,293 @@ public void xreadGroupBinaryAsMapMultipleStreams() { assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); } + // ========== XACKDEL Command Tests ========== + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdel() { + setUpTestStream(); + + // Add a message to the stream + byte[] messageId = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + assertNotNull(messageId); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Read the message with consumer group to add it to PEL + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(1), streams); + + assertEquals(1, messages.size()); + assertEquals(1, messages.get(0).getValue().size()); + byte[] readMessageId = messages.get(0).getValue().get(0).getID().toString().getBytes(); + + // Test XACKDEL - should acknowledge and delete the message + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, readMessageId); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify message is deleted from stream + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdelWithTrimMode() { + setUpTestStream(); + + // Add multiple messages + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Read the messages with consumer group + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Test XACKDEL with KEEP_REFERENCES mode + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, StreamDeletionPolicy.KEEP_REFERENCES, readId1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify one message is deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdelUnreadMessages() { + setUpTestStream(); + + // Add test entries but don't read them + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + + // Test XACKDEL on unread messages - should return NOT_FOUND for PEL + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, id1); + + assertThat(results, hasSize(1)); + // Should return NOT_FOUND because message was never read by the consumer group + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0)); + + // Stream should still contain the message + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXackdelMultipleMessages() { + setUpTestStream(); + + // Add multiple messages + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Read the messages with consumer group + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + assertEquals(1, messages.size()); + assertEquals(3, messages.get(0).getValue().size()); + + // Test XACKDEL with multiple IDs + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + byte[] readId2 = messages.get(0).getValue().get(1).getID().toString().getBytes(); + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, readId1, readId2); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(1)); + + // Verify two messages are deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + // ========== XDELEX Command Tests ========== + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelex() { + setUpTestStream(); + + // Add test entries + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Test basic XDELEX without parameters (should behave like XDEL with KEEP_REFERENCES) + List results = jedis.xdelex(STREAM_KEY_1, id1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify entry is deleted from stream + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexWithTrimMode() { + setUpTestStream(); + + // Add test entries + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + + // Test XDELEX with DELETE_REFERENCES mode + List results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.DELETE_REFERENCES, id1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify entry is deleted from stream + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexMultipleEntries() { + setUpTestStream(); + + // Add test entries + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + byte[] id3 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Test XDELEX with multiple IDs + List results = jedis.xdelex(STREAM_KEY_1, id1, id3); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(1)); + + // Verify two entries are deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexNonExistentEntries() { + setUpTestStream(); + + // Add one entry + byte[] id1 = jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Test XDELEX with mix of existing and non-existent IDs + byte[] nonExistentId = "999-0".getBytes(); + List results = jedis.xdelex(STREAM_KEY_1, id1, nonExistentId); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // Existing entry + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(1)); // Non-existent entry + + // Verify existing entry is deleted + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexWithConsumerGroups() { + setUpTestStream(); + + // Add test entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Read messages with consumer group to add them to PEL + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Acknowledge only the first message + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + byte[] readId2 = messages.get(0).getValue().get(1).getID().toString().getBytes(); + jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1); + + // Test XDELEX with ACKNOWLEDGED mode - should only delete acknowledged entries + List results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged + assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not acknowledged + + // Verify only acknowledged entry was deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXdelexEmptyStream() { + setUpTestStream(); + + // Test XDELEX on empty stream + byte[] nonExistentId = "1-0".getBytes(); + List results = jedis.xdelex(STREAM_KEY_1, nonExistentId); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0)); + } + + // ========== XTRIM Command Tests with trimmingMode ========== + + @Test + @SinceRedisVersion("8.1.240") + public void testXtrimWithKeepReferences() { + setUpTestStream(); + + // Add test entries + for (int i = 1; i <= 5; i++) { + jedis.xadd(STREAM_KEY_1, new XAddParams().id(i + "-0"), HASH_1); + } + assertEquals(5L, jedis.xlen(STREAM_KEY_1)); + + // Read messages with consumer group to create PEL entries + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Test XTRIM with KEEP_REFERENCES mode - should preserve PEL references + long trimmed = jedis.xtrim(STREAM_KEY_1, XTrimParams.xTrimParams().maxLen(3).trimmingMode( + StreamDeletionPolicy.KEEP_REFERENCES)); + assertEquals(2L, trimmed); // Should trim 2 entries + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void testXtrimWithAcknowledged() { + setUpTestStream(); + + // Add test entries + for (int i = 1; i <= 5; i++) { + jedis.xadd(STREAM_KEY_1, new XAddParams().id(i + "-0"), HASH_1); + } + assertEquals(5L, jedis.xlen(STREAM_KEY_1)); + + // Read messages with consumer group + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + assertEquals(1, messages.size()); + assertEquals(3, messages.get(0).getValue().size()); + + // Acknowledge only the first 2 messages + byte[] readId1 = messages.get(0).getValue().get(0).getID().toString().getBytes(); + byte[] readId2 = messages.get(0).getValue().get(1).getID().toString().getBytes(); + jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1, readId2); + + // Test XTRIM with ACKNOWLEDGED mode - should only trim acknowledged entries + long trimmed = jedis.xtrim(STREAM_KEY_1, XTrimParams.xTrimParams().maxLen(3).trimmingMode( + StreamDeletionPolicy.ACKNOWLEDGED)); + // The exact behavior depends on implementation, but it should respect acknowledgment status + assertTrue(trimmed >= 0); + assertTrue(jedis.xlen(STREAM_KEY_1) <= 5); // Should not exceed original length + } + } diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java new file mode 100644 index 0000000000..86d59be8af --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java @@ -0,0 +1,781 @@ +package redis.clients.jedis.commands.unified; + +import io.redis.test.annotations.SinceRedisVersion; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XPendingParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XTrimParams; +import redis.clients.jedis.resps.StreamEntry; +import redis.clients.jedis.resps.StreamPendingEntry; +import redis.clients.jedis.resps.StreamEntryDeletionResult; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public abstract class StreamsCommandsTestBase extends UnifiedJedisCommandsTestBase { + + protected static final String STREAM_KEY_1 = "{stream}-1"; + protected static final String STREAM_KEY_2 = "{stream}-2"; + protected static final String GROUP_NAME = "group-1"; + protected static final String CONSUMER_NAME = "consumer-1"; + + protected static final String FIELD_KEY_1 = "field-1"; + protected static final String VALUE_1 = "value-1"; + protected static final String FIELD_KEY_2 = "field-2"; + protected static final String VALUE_2 = "value-2"; + protected static final Map HASH_1 = singletonMap(FIELD_KEY_1, VALUE_1); + protected static final Map HASH_2 = singletonMap(FIELD_KEY_2, VALUE_2); + + public StreamsCommandsTestBase(RedisProtocol protocol) { + super(protocol); + } + + /** + * Populates a test stream with values using the i-0 format + * @param streamKey The stream key to populate + * @param count Number of entries to add + * @param map Map of field-value pairs for each entry + */ + protected void populateTestStreamWithValues(String streamKey, int count, + Map map) { + for (int i = 1; i <= count; i++) { + jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map); + } + assertEquals(count, jedis.xlen(streamKey)); + } + + @BeforeEach + public void setUp() { + setUpTestClient(); + setUpTestStream(); + } + + protected void setUpTestClient() { + } + + public void setUpTestStream() { + jedis.del(STREAM_KEY_1); + jedis.del(STREAM_KEY_2); + try { + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, StreamEntryID.XGROUP_LAST_ENTRY, true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + try { + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, StreamEntryID.XGROUP_LAST_ENTRY, true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + } + + // ========== XADD Command Tests ========== + + @Test + public void xaddBasic() { + setUpTestStream(); + + // Test basic XADD with auto-generated ID + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + assertNotNull(id1); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Test XADD with multiple fields + Map multiFieldHash = new HashMap<>(); + multiFieldHash.put("field1", "value1"); + multiFieldHash.put("field2", "value2"); + multiFieldHash.put("field3", "value3"); + + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, multiFieldHash); + assertNotNull(id2); + assertTrue(id2.compareTo(id1) > 0); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xaddWithSpecificId() { + setUpTestStream(); + + // Test XADD with specific ID + StreamEntryID specificId = new StreamEntryID("1000-0"); + StreamEntryID resultId = jedis.xadd(STREAM_KEY_1, specificId, HASH_1); + assertEquals(specificId, resultId); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Test XADD with ID that must be greater than previous + StreamEntryID nextId = new StreamEntryID("1001-0"); + StreamEntryID resultId2 = jedis.xadd(STREAM_KEY_1, nextId, HASH_2); + assertEquals(nextId, resultId2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xaddWithParams() { + setUpTestStream(); + + // Test XADD with maxLen parameter + populateTestStreamWithValues(STREAM_KEY_1, 5, HASH_1); + + // Add with maxLen=3, should trim to 3 entries + StreamEntryID id6 = jedis.xadd(STREAM_KEY_1, + XAddParams.xAddParams().id(new StreamEntryID("6-0")).maxLen(3), HASH_2); + assertNotNull(id6); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xaddErrorCases() { + setUpTestStream(); + + // Test XADD with empty hash should fail + try { + Map emptyHash = new HashMap<>(); + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, emptyHash); + fail("Should throw JedisDataException for empty hash"); + } catch (JedisDataException expected) { + assertTrue(expected.getMessage().contains("wrong number of arguments")); + } + + // Test XADD with noMkStream on non-existent stream + StreamEntryID result = jedis.xadd("non-existent-stream", XAddParams.xAddParams().noMkStream(), + HASH_1); + assertNull(result); + } + + @ParameterizedTest + @CsvSource({ "KEEP_REFERENCES,3", "DELETE_REFERENCES,0" }) + @SinceRedisVersion("8.1.240") + public void xaddWithTrimmingMode(StreamDeletionPolicy trimMode, int expected) { + setUpTestStream(); + Map map = singletonMap("field", "value"); + + // Add initial entries to the stream + populateTestStreamWithValues(STREAM_KEY_1, 5, map); + + // Create consumer group and read messages to create PEL entries + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), + streamQuery); + + // Verify PEL has entries + List pendingBefore = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Add new entry with maxLen=3 and KEEP_REFERENCES mode + StreamEntryID newId = jedis.xadd(STREAM_KEY_1, + XAddParams.xAddParams().id(new StreamEntryID("6-0")).maxLen(3).trimmingMode(trimMode), map); + assertNotNull(newId); + + // Stream should be trimmed to 3 entries + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + List pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(expected, pendingAfter.size()); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithTrimmingModeAcknowledged() { + setUpTestStream(); + Map map = singletonMap("field", "value"); + + // Add initial entries to the stream + populateTestStreamWithValues(STREAM_KEY_1, 5, map); + + // Create consumer group and read messages + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Acknowledge the first 2 messages + StreamEntryID id1 = messages.get(0).getValue().get(0).getID(); + StreamEntryID id2 = messages.get(0).getValue().get(1).getID(); + jedis.xack(STREAM_KEY_1, GROUP_NAME, id1, id2); + + // Verify PEL state + List pendingBefore = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(1, pendingBefore.size()); // Only 1 unacknowledged message + + // Add new entry with maxLen=3 and ACKNOWLEDGED mode + StreamEntryID newId = jedis.xadd(STREAM_KEY_1, XAddParams.xAddParams() + .id(new StreamEntryID("6-0")).maxLen(3).trimmingMode(StreamDeletionPolicy.ACKNOWLEDGED), + map); + assertNotNull(newId); + + // Stream length should respect acknowledgment status + long streamLen = jedis.xlen(STREAM_KEY_1); + assertEquals(4, streamLen); // Should not trim unacknowledged entries aggressively + + // PEL should still contain unacknowledged entries + List pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(1, pendingAfter.size()); // Unacknowledged entries should remain + } + + // ========== XTRIM Command Tests ========== + + @Test + public void xtrimBasic() { + setUpTestStream(); + + // Add test entries + populateTestStreamWithValues(STREAM_KEY_1, 5, HASH_1); + + // Test basic XTRIM with maxLen + long trimmed = jedis.xtrim(STREAM_KEY_1, 3, false); + assertEquals(2L, trimmed); // Should trim 2 entries + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xtrimWithParams() { + setUpTestStream(); + + // Add test entries with specific IDs + populateTestStreamWithValues(STREAM_KEY_1, 5, HASH_1); + + // Test XTRIM with XTrimParams and exact trimming + long trimmed = jedis.xtrim(STREAM_KEY_1, XTrimParams.xTrimParams().maxLen(3).exactTrimming()); + assertEquals(2L, trimmed); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Test XTRIM with minId - use "4-0" since we have entries 1-0, 2-0, 3-0, 4-0, 5-0 + long trimmed2 = jedis.xtrim(STREAM_KEY_1, + XTrimParams.xTrimParams().minId("4-0").exactTrimming()); + assertEquals(1L, trimmed2); // Should trim entries with ID < 4-0 (only 3-0 should be trimmed) + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xtrimApproximate() { + setUpTestStream(); + + // Add many entries + populateTestStreamWithValues(STREAM_KEY_1, 10, HASH_1); + + // Test approximate trimming + long trimmed = jedis.xtrim(STREAM_KEY_1, 5, true); + assertTrue(trimmed >= 0); // Approximate trimming may trim different amounts + assertTrue(jedis.xlen(STREAM_KEY_1) <= 10); // Should not exceed original length + } + + @ParameterizedTest + @CsvSource({ "KEEP_REFERENCES,3", "DELETE_REFERENCES,1" }) + @SinceRedisVersion("8.1.240") + public void xaddWithMinIdTrimmingMode(StreamDeletionPolicy trimMode, int expected) { + setUpTestStream(); + Map map = singletonMap("field", "value"); + + // Add initial entries with specific IDs + populateTestStreamWithValues(STREAM_KEY_1, 5, map); + + // Create consumer group and read messages to create PEL entries + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), + streamQuery); + + // Verify PEL has entries + List pendingBefore = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Add new entry with minId="3-0" and specified trimming mode + StreamEntryID newId = jedis.xadd(STREAM_KEY_1, + XAddParams.xAddParams().id(new StreamEntryID("6-0")).minId("3-0").trimmingMode(trimMode), + map); + assertNotNull(newId); + + // Stream should have entries >= 3-0 plus the new entry + long streamLen = jedis.xlen(STREAM_KEY_1); + assertTrue(streamLen >= 3); + + // Check PEL entries based on trimming mode + List pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(expected, pendingAfter.size()); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithApproximateTrimmingAndTrimmingMode() { + setUpTestStream(); + Map map = singletonMap("field", "value"); + + // Add initial entries + populateTestStreamWithValues(STREAM_KEY_1, 10, map); + + // Create consumer group and read messages + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5), + streamQuery); + + // Add new entry with approximate trimming and KEEP_REFERENCES mode + StreamEntryID newId = jedis.xadd(STREAM_KEY_1, + XAddParams.xAddParams().id(new StreamEntryID("11-0")).maxLen(5).approximateTrimming() + .trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), + map); + assertNotNull(newId); + + // With approximate trimming, the exact length may vary but should be around the target + long streamLen = jedis.xlen(STREAM_KEY_1); + assertTrue(streamLen >= 5); // Should be approximately 5, but may be more due to approximation + + // PEL should preserve references + List pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(5, pendingAfter.size()); // All read messages should remain in PEL + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithExactTrimmingAndTrimmingMode() { + setUpTestStream(); + Map map = singletonMap("field", "value"); + + // Add initial entries + populateTestStreamWithValues(STREAM_KEY_1, 5, map); + + // Create consumer group and read messages + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), + streamQuery); + + // Add new entry with exact trimming and DELETE_REFERENCES mode + StreamEntryID newId = jedis.xadd(STREAM_KEY_1, + XAddParams.xAddParams().id(new StreamEntryID("6-0")).maxLen(3).exactTrimming() + .trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES), + map); + assertNotNull(newId); + + // With exact trimming, stream should be exactly 3 entries + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // PEL references should be cleaned up for trimmed entries + List pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + // Only entries that still exist in the stream should remain in PEL + assertTrue(pendingAfter.size() <= 3); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xaddWithLimitAndTrimmingMode() { + setUpTestStream(); + Map map = singletonMap("field", "value"); + + // Add initial entries + populateTestStreamWithValues(STREAM_KEY_1, 10, map); + + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5), + streamQuery); + + // Add new entry with limit and KEEP_REFERENCES mode (limit requires approximate trimming) + StreamEntryID newId = jedis.xadd(STREAM_KEY_1, + XAddParams.xAddParams().id(new StreamEntryID("11-0")).maxLen(5).approximateTrimming() // Required + // for + // limit + // to + // work + .limit(2) // Limit the number of entries to examine for trimming + .trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), + map); + assertNotNull(newId); + + // With limit, trimming may be less aggressive + long streamLen = jedis.xlen(STREAM_KEY_1); + assertTrue(streamLen >= 5); // Should be at least 5, but may be more due to limit + + // PEL should preserve references + List pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME, + XPendingParams.xPendingParams().count(10)); + assertEquals(5, pendingAfter.size()); // All read messages should remain in PEL + } + + // ========== XACK Command Tests ========== + + @Test + public void xackBasic() { + setUpTestStream(); + + // Add a message to the stream + StreamEntryID messageId = jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + assertNotNull(messageId); + + // Consumer group already created in setUpTestStream(), just read message + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(1), streams); + + assertEquals(1, messages.size()); + assertEquals(1, messages.get(0).getValue().size()); + StreamEntryID readMessageId = messages.get(0).getValue().get(0).getID(); + + // Test XACK + long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, readMessageId); + assertEquals(1L, acked); + } + + @Test + public void xackMultipleMessages() { + setUpTestStream(); + + // Add multiple messages + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + + // Consumer group already created in setUpTestStream(), just read messages + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Test XACK with multiple IDs + StreamEntryID readId1 = messages.get(0).getValue().get(0).getID(); + StreamEntryID readId2 = messages.get(0).getValue().get(1).getID(); + long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1, readId2); + assertEquals(2L, acked); + } + + @Test + public void xackNonExistentMessage() { + setUpTestStream(); + + // Consumer group already created in setUpTestStream() + // Test XACK with non-existent message ID + StreamEntryID nonExistentId = new StreamEntryID("999-0"); + long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, nonExistentId); + assertEquals(0L, acked); // Should return 0 for non-existent message + } + + // ========== XDEL Command Tests ========== + + @Test + public void xdelBasic() { + setUpTestStream(); + + // Add test entries + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Test XDEL with single ID + long deleted = jedis.xdel(STREAM_KEY_1, id1); + assertEquals(1L, deleted); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xdelMultipleEntries() { + setUpTestStream(); + + // Add test entries + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + StreamEntryID id3 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("3-0"), HASH_1); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Test XDEL with multiple IDs + long deleted = jedis.xdel(STREAM_KEY_1, id1, id3); + assertEquals(2L, deleted); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xdelNonExistentEntries() { + setUpTestStream(); + + // Add one entry + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Test XDEL with mix of existing and non-existent IDs + StreamEntryID nonExistentId = new StreamEntryID("999-0"); + long deleted = jedis.xdel(STREAM_KEY_1, id1, nonExistentId); + assertEquals(1L, deleted); // Should only delete the existing entry + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + public void xdelEmptyStream() { + setUpTestStream(); + + // Test XDEL on empty stream + StreamEntryID nonExistentId = new StreamEntryID("1-0"); + long deleted = jedis.xdel(STREAM_KEY_1, nonExistentId); + assertEquals(0L, deleted); + } + + // ========== XACKDEL Command Tests ========== + + @Test + @SinceRedisVersion("8.1.240") + public void xackdelBasic() { + setUpTestStream(); + + // Add a message to the stream + StreamEntryID messageId = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + assertNotNull(messageId); + + // Consumer group already created in setUpTestStream(), read message + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(1), streams); + + assertEquals(1, messages.size()); + assertEquals(1, messages.get(0).getValue().size()); + StreamEntryID readMessageId = messages.get(0).getValue().get(0).getID(); + + // Test XACKDEL - should acknowledge and delete the message + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, + readMessageId); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify message is deleted from stream + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xackdelWithTrimMode() { + setUpTestStream(); + + // Add multiple messages + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + + // Consumer group already created, read messages + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Test XACKDEL with KEEP_REFERENCES mode + StreamEntryID readId1 = messages.get(0).getValue().get(0).getID(); + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, + StreamDeletionPolicy.KEEP_REFERENCES, readId1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify one message is deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xackdelUnreadMessages() { + setUpTestStream(); + + // Add test entries but don't read them + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + + // Test XACKDEL on unread messages - should return NOT_FOUND for PEL + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, id1); + + assertThat(results, hasSize(1)); + // Should return NOT_FOUND because message was never read by the consumer group + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0)); + + // Stream should still contain the message + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xackdelMultipleMessages() { + setUpTestStream(); + + // Add multiple messages + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + StreamEntryID id3 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("3-0"), HASH_1); + + // Read all messages + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + assertEquals(1, messages.size()); + assertEquals(3, messages.get(0).getValue().size()); + + // Test XACKDEL with multiple IDs + StreamEntryID readId1 = messages.get(0).getValue().get(0).getID(); + StreamEntryID readId2 = messages.get(0).getValue().get(1).getID(); + List results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, readId1, + readId2); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(1)); + + // Verify two messages are deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + // ========== XDELEX Command Tests ========== + + @Test + @SinceRedisVersion("8.1.240") + public void xdelexBasic() { + setUpTestStream(); + + // Add test entries + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + assertEquals(2L, jedis.xlen(STREAM_KEY_1)); + + // Test basic XDELEX without parameters (should behave like XDEL with KEEP_REFERENCES) + List results = jedis.xdelex(STREAM_KEY_1, id1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify entry is deleted from stream + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xdelexWithTrimMode() { + setUpTestStream(); + + // Add test entries + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + + // Test XDELEX with DELETE_REFERENCES mode + List results = jedis.xdelex(STREAM_KEY_1, + StreamDeletionPolicy.DELETE_REFERENCES, id1); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + + // Verify entry is deleted from stream + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xdelexMultipleEntries() { + setUpTestStream(); + + // Add test entries + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + StreamEntryID id3 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("3-0"), HASH_1); + assertEquals(3L, jedis.xlen(STREAM_KEY_1)); + + // Test XDELEX with multiple IDs + List results = jedis.xdelex(STREAM_KEY_1, id1, id3); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(1)); + + // Verify two entries are deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xdelexNonExistentEntries() { + setUpTestStream(); + + // Add one entry + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + + // Test XDELEX with mix of existing and non-existent IDs + StreamEntryID nonExistentId = new StreamEntryID("999-0"); + List results = jedis.xdelex(STREAM_KEY_1, id1, nonExistentId); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // Existing entry + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(1)); // Non-existent entry + + // Verify existing entry is deleted + assertEquals(0L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xdelexWithConsumerGroups() { + setUpTestStream(); + + // Add test entries + StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1); + StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2); + + // Read messages to add them to PEL + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> messages = jedis.xreadGroup(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + assertEquals(1, messages.size()); + assertEquals(2, messages.get(0).getValue().size()); + + // Acknowledge only the first message + StreamEntryID readId1 = messages.get(0).getValue().get(0).getID(); + StreamEntryID readId2 = messages.get(0).getValue().get(1).getID(); + jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1); + + // Test XDELEX with ACKNOWLEDGED mode - should only delete acknowledged entries + List results = jedis.xdelex(STREAM_KEY_1, + StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2); + assertThat(results, hasSize(2)); + assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged + assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not + // acknowledged + + // Verify only acknowledged entry was deleted + assertEquals(1L, jedis.xlen(STREAM_KEY_1)); + } + + @Test + @SinceRedisVersion("8.1.240") + public void xdelexEmptyStream() { + setUpTestStream(); + + // Test XDELEX on empty stream + StreamEntryID nonExistentId = new StreamEntryID("1-0"); + List results = jedis.xdelex(STREAM_KEY_1, nonExistentId); + assertThat(results, hasSize(1)); + assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0)); + } +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java index e651c1a4c1..086b3416b8 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java @@ -1,15 +1,30 @@ package redis.clients.jedis.commands.unified.cluster; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPorts; import redis.clients.jedis.RedisProtocol; import redis.clients.jedis.commands.unified.StreamsBinaryCommandsTestBase; +import redis.clients.jedis.util.EnabledOnCommandCondition; +import redis.clients.jedis.util.RedisVersionCondition; @ParameterizedClass @MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") public class ClusterStreamsBinaryCommandsTest extends StreamsBinaryCommandsTestBase { + @RegisterExtension + public RedisVersionCondition versionCondition = new RedisVersionCondition( + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); + + @RegisterExtension + public EnabledOnCommandCondition enabledOnCommandCondition = new EnabledOnCommandCondition( + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); + public ClusterStreamsBinaryCommandsTest(RedisProtocol protocol) { super(protocol); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsCommandsTest.java new file mode 100644 index 0000000000..b65b478621 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsCommandsTest.java @@ -0,0 +1,43 @@ +package redis.clients.jedis.commands.unified.cluster; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPorts; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.commands.unified.StreamsCommandsTestBase; +import redis.clients.jedis.util.EnabledOnCommandCondition; +import redis.clients.jedis.util.RedisVersionCondition; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class ClusterStreamsCommandsTest extends StreamsCommandsTestBase { + + @RegisterExtension + public RedisVersionCondition versionCondition = new RedisVersionCondition( + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); + + @RegisterExtension + public EnabledOnCommandCondition enabledOnCommandCondition = new EnabledOnCommandCondition( + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); + + public ClusterStreamsCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + @Override + protected void setUpTestClient() { + jedis = ClusterCommandsTestHelper.getCleanCluster(protocol); + } + + @AfterEach + public void tearDown() { + jedis.close(); + ClusterCommandsTestHelper.clearClusterData(); + } + +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java index 93f4279b4a..b967e2a4e1 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java @@ -1,15 +1,26 @@ package redis.clients.jedis.commands.unified.pooled; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; import redis.clients.jedis.RedisProtocol; import redis.clients.jedis.commands.unified.StreamsBinaryCommandsTestBase; +import redis.clients.jedis.util.EnabledOnCommandCondition; +import redis.clients.jedis.util.RedisVersionCondition; @ParameterizedClass @MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") public class PooledStreamsBinaryCommandsTest extends StreamsBinaryCommandsTestBase { + @RegisterExtension + public RedisVersionCondition versionCondition = new RedisVersionCondition( + PooledCommandsTestHelper.nodeInfo); + + @RegisterExtension + public EnabledOnCommandCondition enabledOnCommandCondition = new EnabledOnCommandCondition( + PooledCommandsTestHelper.nodeInfo); + public PooledStreamsBinaryCommandsTest(RedisProtocol protocol) { super(protocol); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsCommandsTest.java new file mode 100644 index 0000000000..24a3aaf889 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsCommandsTest.java @@ -0,0 +1,39 @@ +package redis.clients.jedis.commands.unified.pooled; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.commands.unified.StreamsCommandsTestBase; +import redis.clients.jedis.util.EnabledOnCommandCondition; +import redis.clients.jedis.util.RedisVersionCondition; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class PooledStreamsCommandsTest extends StreamsCommandsTestBase { + + @RegisterExtension + public RedisVersionCondition versionCondition = new RedisVersionCondition( + PooledCommandsTestHelper.nodeInfo); + + @RegisterExtension + public EnabledOnCommandCondition enabledOnCommandCondition = new EnabledOnCommandCondition( + PooledCommandsTestHelper.nodeInfo); + + public PooledStreamsCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + @Override + public void setUpTestClient() { + jedis = PooledCommandsTestHelper.getPooled(protocol); + PooledCommandsTestHelper.clearData(); + } + + @AfterEach + public void tearDown() { + jedis.close(); + } + +} diff --git a/src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java b/src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java new file mode 100644 index 0000000000..ea6a4c0b53 --- /dev/null +++ b/src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java @@ -0,0 +1,51 @@ +package redis.clients.jedis.resps; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class StreamEntryDeletionResultTest { + + @Test + public void testFromCode() { + assertEquals(StreamEntryDeletionResult.NOT_FOUND, StreamEntryDeletionResult.fromCode(-1)); + assertEquals(StreamEntryDeletionResult.DELETED, StreamEntryDeletionResult.fromCode(1)); + assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, + StreamEntryDeletionResult.fromCode(2)); + } + + @Test + public void testFromCodeInvalid() { + assertThrows(IllegalArgumentException.class, () -> StreamEntryDeletionResult.fromCode(0)); + assertThrows(IllegalArgumentException.class, () -> StreamEntryDeletionResult.fromCode(3)); + assertThrows(IllegalArgumentException.class, () -> StreamEntryDeletionResult.fromCode(-2)); + } + + @Test + public void testFromLong() { + assertEquals(StreamEntryDeletionResult.NOT_FOUND, StreamEntryDeletionResult.fromLong(-1L)); + assertEquals(StreamEntryDeletionResult.DELETED, StreamEntryDeletionResult.fromLong(1L)); + assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, + StreamEntryDeletionResult.fromLong(2L)); + } + + @Test + public void testFromLongNull() { + assertThrows(IllegalArgumentException.class, () -> StreamEntryDeletionResult.fromLong(null)); + } + + @Test + public void testGetCode() { + assertEquals(-1, StreamEntryDeletionResult.NOT_FOUND.getCode()); + assertEquals(1, StreamEntryDeletionResult.DELETED.getCode()); + assertEquals(2, StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED.getCode()); + } + + @Test + public void testToString() { + assertEquals("NOT_FOUND(-1)", StreamEntryDeletionResult.NOT_FOUND.toString()); + assertEquals("DELETED(1)", StreamEntryDeletionResult.DELETED.toString()); + assertEquals("ACKNOWLEDGED_NOT_DELETED(2)", + StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED.toString()); + } +}