From ae2916aea91194679ac62d80d23f425003ad3e5f Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Mon, 19 Feb 2024 17:19:51 +0200 Subject: [PATCH 1/2] Add support for blocking XREAD and XREADGROUP Issues #237 and #255. --- .../CoreCommands/CoreCommandBuilder.cs | 75 ++++ src/NRedisStack/CoreCommands/CoreCommands.cs | 110 ++++++ .../DataTypes/RedisStreamEntries.cs | 25 ++ .../DataTypes/StreamSpecialIds.cs | 22 ++ .../CoreCommands/Literals/CommandArgs.cs | 4 + .../CoreCommands/Literals/Commands.cs | 2 + src/NRedisStack/ResponseParser.cs | 51 +++ .../Core Commands/CoreTests.cs | 338 ++++++++++++++++++ 8 files changed, 627 insertions(+) create mode 100644 src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs create mode 100644 src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs diff --git a/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs b/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs index 5f8f62e1..a343f3b2 100644 --- a/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs +++ b/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs @@ -109,6 +109,81 @@ public static SerializedCommand BRPopLPush(RedisKey source, RedisKey destination return new SerializedCommand(RedisCoreCommands.BRPOPLPUSH, args); } + public static SerializedCommand XRead(RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds) + { + if (keys.Length == 0) + { + throw new ArgumentException("At least one key must be provided."); + } + + if (keys.Length != positions.Length) + { + throw new ArgumentException("The number of keys and positions must be the same."); + } + + List args = new List(); + + if (count != null) + { + args.Add(CoreArgs.COUNT); + args.Add(count); + } + + if (timeoutMilliseconds != null) + { + args.Add(CoreArgs.BLOCK); + args.Add(timeoutMilliseconds); + } + + args.Add(CoreArgs.STREAMS); + args.AddRange(keys.Cast()); + args.AddRange(positions.Cast()); + + return new SerializedCommand(RedisCoreCommands.XREAD, args); + } + + public static SerializedCommand XReadGroup(RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds, bool? noAcknowledge) + { + if (keys.Length == 0) + { + throw new ArgumentException("At least one key must be provided."); + } + + if (keys.Length != positions.Length) + { + throw new ArgumentException("The number of keys and positions must be the same."); + } + + List args = new List(); + + args.Add(CoreArgs.GROUP); + args.Add(groupName); + args.Add(consumerName); + + if (count != null) + { + args.Add(CoreArgs.COUNT); + args.Add(count); + } + + if (timeoutMilliseconds != null) + { + args.Add(CoreArgs.BLOCK); + args.Add(timeoutMilliseconds); + } + + if (noAcknowledge != null && noAcknowledge.Value) + { + args.Add(CoreArgs.NOACK); + } + + args.Add(CoreArgs.STREAMS); + args.AddRange(keys.Cast()); + args.AddRange(positions.Cast()); + + return new SerializedCommand(RedisCoreCommands.XREADGROUP, args); + } + private static SerializedCommand BlockingCommandWithKeysAndTimeout(String command, RedisKey[] keys, double timeout) { if (keys.Length == 0) diff --git a/src/NRedisStack/CoreCommands/CoreCommands.cs b/src/NRedisStack/CoreCommands/CoreCommands.cs index 514f3176..7a2a5ae4 100644 --- a/src/NRedisStack/CoreCommands/CoreCommands.cs +++ b/src/NRedisStack/CoreCommands/CoreCommands.cs @@ -370,5 +370,115 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val var command = CoreCommandBuilder.BRPopLPush(source, destination, timeout); return db.Execute(command).ToRedisValue(); } + + /// + /// The XREAD command. + /// + /// Read data from one or multiple streams, only returning entries with an ID greater than an ID provided by the caller. + /// + /// The class where this extension method is applied. + /// Keys of the streams where to read from. + /// The positions from which to begin reading for each stream. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// A value of for each stream, or null if the command times out + /// on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static RedisStreamEntries[]? XRead(this IDatabase db, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null) + { + var command = CoreCommandBuilder.XRead(keys, positions, count, timeoutMilliseconds); + return db.Execute(command).ToRedisStreamEntries(); + } + + /// + /// Syntactic sugar for , + /// where only one stream is being read from. + /// + /// The class where this extension method is applied. + /// Key of the stream where to read from. + /// The position from which to begin reading. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// A list with the data read from the stream, of null if the command + /// times out on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static StreamEntry[]? XRead(this IDatabase db, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null) + { + var result = XRead(db, new[] { key }, new[] { position }, count, timeoutMilliseconds); + if (result == null || result.Length == 0) + { + return null; + } + return result[0].Entries; + } + + /// + /// The XREADGROUP command. + /// + /// Read new or historical messages in one or several streams, for a consumer in a consumer group. + /// + /// The class where this extension method is applied. + /// The consumer group name. + /// The name of the consumer in the consumer group. + /// Keys of the streams where to read from. + /// The positions from which to begin reading for each stream. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// If set to true then inform the server that it should not wait for ACK for the + /// messages it sends to this read call. + /// A value of for each stream, or null if the command times out + /// on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static RedisStreamEntries[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null) + { + var command = CoreCommandBuilder.XReadGroup(groupName, consumerName, keys, positions, count, timeoutMilliseconds, noAck); + return db.Execute(command).ToRedisStreamEntries(); + } + + /// + /// Syntactic sugar for , + /// where only one stream is being read from. + /// + /// The class where this extension method is applied. + /// The consumer group name. + /// The name of the consumer in the consumer group. + /// Key of the stream where to read from. + /// The position from which to begin reading. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// If set to true then inform the server that it should not wait for ACK for the + /// messages it sends to this read call. + /// A list with the data read from the stream, of null if the command + /// times out on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static StreamEntry[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null) + { + var result = XReadGroup(db, groupName, consumerName, new[] { key }, new[] { position }, count, timeoutMilliseconds, noAck); + if (result == null || result.Length == 0) + { + return null; + } + return result[0].Entries; + } } } diff --git a/src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs b/src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs new file mode 100644 index 00000000..e472ec7b --- /dev/null +++ b/src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs @@ -0,0 +1,25 @@ +using StackExchange.Redis; + +namespace NRedisStack.Core.DataTypes; + +/// +/// Holds the key and the entries for a Redis Stream, as returned by, for example, the XREAD or the XREADGROUP commands. +/// +public readonly struct RedisStreamEntries +{ + internal RedisStreamEntries(RedisKey key, StreamEntry[] entries) + { + Key = key; + Entries = entries; + } + + /// + /// The key for the stream. + /// + public RedisKey Key { get; } + + /// + /// An array of entries contained within the stream. + /// + public StreamEntry[] Entries { get; } +} diff --git a/src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs b/src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs new file mode 100644 index 00000000..b10b2bc5 --- /dev/null +++ b/src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs @@ -0,0 +1,22 @@ +namespace NRedisStack.Core.DataTypes; + +/// +/// Constants for special stream Ids, to be used, for example, with the XREAD and XREADGROUP commands +/// +public class StreamSpecialIds +{ + /// + /// Smallest incomplete ID, can be used for reading from the very first message in a stream. + /// + public const string AllMessagesId = "0"; + + /// + /// For receiving only new messages that arrive after blocking on a read. + /// + public const string NewMessagesId = "$"; + + /// + /// For receiving only messages that were never delivered to any other consumer. + /// + public const string UndeliveredMessagesId = ">"; +} \ No newline at end of file diff --git a/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs b/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs index 9611cefd..edb8142f 100644 --- a/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs +++ b/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs @@ -2,11 +2,15 @@ namespace NRedisStack.Core.Literals { internal static class CoreArgs { + public const string BLOCK = "BLOCK"; public const string COUNT = "COUNT"; + public const string GROUP = "GROUP"; public const string LEFT = "LEFT"; public const string MAX = "MAX"; public const string MIN = "MIN"; + public const string NOACK = "NOACK"; public const string RIGHT = "RIGHT"; + public const string STREAMS = "STREAMS"; public const string lib_name = "LIB-NAME"; public const string lib_ver = "LIB-VER"; } diff --git a/src/NRedisStack/CoreCommands/Literals/Commands.cs b/src/NRedisStack/CoreCommands/Literals/Commands.cs index 7e969144..2b312449 100644 --- a/src/NRedisStack/CoreCommands/Literals/Commands.cs +++ b/src/NRedisStack/CoreCommands/Literals/Commands.cs @@ -15,5 +15,7 @@ internal static class RedisCoreCommands public const string BZPOPMIN = "BZPOPMIN"; public const string CLIENT = "CLIENT"; public const string SETINFO = "SETINFO"; + public const string XREAD = "XREAD"; + public const string XREADGROUP = "XREADGROUP"; } } diff --git a/src/NRedisStack/ResponseParser.cs b/src/NRedisStack/ResponseParser.cs index 58546d37..0be0b169 100644 --- a/src/NRedisStack/ResponseParser.cs +++ b/src/NRedisStack/ResponseParser.cs @@ -802,5 +802,56 @@ public static Dictionary[] ToDictionarys(this RedisResult r return new Tuple>(resultKey, values); } + + public static RedisStreamEntries[]? ToRedisStreamEntries(this RedisResult result) + { + if (result.IsNull) + { + return null; + } + + var resultArray = (RedisResult[])result!; + RedisStreamEntries[] redisStreamEntries = new RedisStreamEntries[resultArray.Length]; + for (int i = 0; i < resultArray.Length; i++) + { + RedisResult[] streamResultArray = (RedisResult[])resultArray[i]!; + RedisKey streamKey = streamResultArray[0].ToRedisKey(); + StreamEntry[] streamEntries = ParseStreamEntries(streamResultArray[1].ToArray()); + redisStreamEntries[i] = new RedisStreamEntries(streamKey, streamEntries); + } + + return redisStreamEntries; + } + + private static StreamEntry[] ParseStreamEntries(IReadOnlyList results) + { + int count = results.Count; + StreamEntry[] streamEntries = new StreamEntry[count]; + + for (int i = 0; i < count; i++) + { + RedisResult[] streamEntryArray = (RedisResult[])results[i]!; + RedisValue key = streamEntryArray[0].ToRedisValue(); + NameValueEntry[] nameValueEntries = ParseNameValueEntries(streamEntryArray[1].ToArray()); + streamEntries[i] = new StreamEntry(key, nameValueEntries); + } + + return streamEntries; + } + + private static NameValueEntry[] ParseNameValueEntries(IReadOnlyList redisResults) + { + int count = redisResults.Count / 2; + var nameValueEntries = new NameValueEntry[count]; + + for (int i = 0; i < count; i++) + { + nameValueEntries[i] = new NameValueEntry( + redisResults[2 * i].ToRedisValue(), + redisResults[2 * i + 1].ToRedisValue()); + } + + return nameValueEntries; + } } } \ No newline at end of file diff --git a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs index dfad0471..126d10b4 100644 --- a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs +++ b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs @@ -656,4 +656,342 @@ public void TestBRPopLPush() Assert.Equal(3, db.ListLength("list-two")); Assert.Equal("b", db.ListLeftPop("list-two")); } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXRead() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + + var result = db.XRead("my-stream", StreamSpecialIds.AllMessagesId, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + StreamEntry streamEntry = result![0]; + var lastKey = streamEntry.Id; + Assert.Single(streamEntry.Values); + Assert.Equal("a", streamEntry.Values[0].Name); + Assert.Equal(1, streamEntry.Values[0].Value); + + result = db.XRead("my-stream", lastKey, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + streamEntry = result![0]; + Assert.Single(streamEntry.Values); + Assert.Equal("b", streamEntry.Values[0].Name); + Assert.Equal(7, streamEntry.Values[0].Value); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadMultipleStreams() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("stream-one", "a", 1); + db.StreamAdd("stream-one", "b", 7); + db.StreamAdd("stream-two", "c", "foo"); + db.StreamAdd("stream-two", "d", "bar"); + + var result = db.XRead(new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.AllMessagesId, StreamSpecialIds.AllMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + var lastKeyOne = result![0].Entries[0].Id; + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("a", result![0].Entries[0].Values[0].Name); + Assert.Equal(1, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + var lastKeyTwo = result![1].Entries[0].Id; + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("c", result![1].Entries[0].Values[0].Name); + Assert.Equal("foo", result![1].Entries[0].Values[0].Value); + + result = db.XRead(new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { lastKeyOne, lastKeyTwo }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("b", result![0].Entries[0].Values[0].Name); + Assert.Equal(7, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("d", result![1].Entries[0].Values[0].Name); + Assert.Equal("bar", result![1].Entries[0].Values[0].Value); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadOnlyNewMessages() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("my-stream", "a", 1); + + // Reading only new messages will yield null, because we don't add any and the read times out. + var result = db.XRead("my-stream", StreamSpecialIds.NewMessagesId, + count: 1, timeoutMilliseconds: 500); + + Assert.Null(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadNoKeysProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XRead(Array.Empty(), + new RedisValue[] { StreamSpecialIds.NewMessagesId })); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadMismatchedKeysAndPositionsCountsProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XRead(new RedisKey[] { "my-stream" }, + new RedisValue[] { StreamSpecialIds.NewMessagesId, StreamSpecialIds.NewMessagesId })); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroup() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + db.StreamAdd("my-stream", "c", 11); + db.StreamAdd("my-stream", "d", 12); + + // Read one message by each consumer. + var result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerAIdOne = result![0].Id; + Assert.Single(result[0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerBIdOne = result![0].Id; + Assert.Single(result[0].Values); + Assert.Equal("b", result![0].Values[0].Name); + Assert.Equal(7, result![0].Values[0].Value); + + // Read another message from each consumer, don't ACK anything. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerAIdTwo = result![0].Id; + Assert.Single(result![0].Values); + Assert.Equal("c", result![0].Values[0].Name); + Assert.Equal(11, result![0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerBIdTwo = result![0].Id; + Assert.Single(result![0].Values); + Assert.Equal("d", result![0].Values[0].Name); + Assert.Equal(12, result![0].Values[0].Value); + + // Since we didn't ACK anything, the pending messages can be re-read with the right ID. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("b", result![0].Values[0].Name); + Assert.Equal(7, result![0].Values[0].Value); + + // ACK the messages. + var ackedMessagesCount = db.StreamAcknowledge("my-stream", "my-group", + new[] { consumerAIdOne, consumerAIdTwo, consumerBIdOne, consumerBIdTwo }); + Assert.Equal(4, ackedMessagesCount); + + // After ACK we don't see anything pending. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupNoAck() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + db.StreamAdd("my-stream", "c", 11); + db.StreamAdd("my-stream", "d", 12); + + var result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, + count: 1, timeoutMilliseconds: 1000, noAck: true); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + // We don't see anything pending because of the NOACK. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupMultipleStreams() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("stream-one", "my-group"); + Assert.True(groupCreationResult); + + groupCreationResult = db.StreamCreateConsumerGroup("stream-two", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("stream-one", "a", 1); + db.StreamAdd("stream-two", "b", 7); + db.StreamAdd("stream-one", "c", 11); + db.StreamAdd("stream-two", "d", 17); + + var result = db.XReadGroup("my-group", "consumer-a", + new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.UndeliveredMessagesId, StreamSpecialIds.UndeliveredMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("a", result![0].Entries[0].Values[0].Name); + Assert.Equal(1, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("b", result![1].Entries[0].Values[0].Name); + Assert.Equal(7, result![1].Entries[0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.UndeliveredMessagesId, StreamSpecialIds.UndeliveredMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("c", result![0].Entries[0].Values[0].Name); + Assert.Equal(11, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("d", result![1].Entries[0].Values[0].Name); + Assert.Equal(17, result![1].Entries[0].Values[0].Value); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupNull() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + var result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, + count: 1, timeoutMilliseconds: 500); + + Assert.Null(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupNoKeysProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XReadGroup("my-group", "consumer", + Array.Empty(), new RedisValue[] { StreamSpecialIds.NewMessagesId })); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupMismatchedKeysAndPositionsCountsProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XReadGroup("my-group", "consumer", + new RedisKey[] { "my-stream" }, new RedisValue[] { StreamSpecialIds.NewMessagesId, StreamSpecialIds.NewMessagesId })); + } } \ No newline at end of file From 49150d1b43c62487622bf358be53e85f33bd2e14 Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Mon, 26 Feb 2024 12:40:11 +0200 Subject: [PATCH 2/2] Extend unit test coverage --- tests/NRedisStack.Tests/Core Commands/CoreTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs index 126d10b4..2acc3bcb 100644 --- a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs +++ b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs @@ -707,12 +707,14 @@ public void TestXReadMultipleStreams() Assert.NotNull(result); Assert.Equal(2, result!.Length); + Assert.Equal("stream-one", result![0].Key); Assert.Single(result![0].Entries); var lastKeyOne = result![0].Entries[0].Id; Assert.Single(result![0].Entries[0].Values); Assert.Equal("a", result![0].Entries[0].Values[0].Name); Assert.Equal(1, result![0].Entries[0].Values[0].Value); + Assert.Equal("stream-two", result![1].Key); Assert.Single(result![1].Entries); var lastKeyTwo = result![1].Entries[0].Id; Assert.Single(result![1].Entries[0].Values);