Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8724231
Fix for FT.CURSOR in cluster; requires single server
mgravell Sep 9, 2025
3986900
update interfaces
mgravell Sep 9, 2025
c9be7e5
dotnet format
mgravell Sep 9, 2025
c6ecb13
use correct routing in AddDocument
mgravell Sep 10, 2025
30f99a4
.gitignore - docker containers
mgravell Sep 10, 2025
db6d6d7
dotnet format
mgravell Sep 10, 2025
de30e25
- enable all-environments over almost all FT tests
mgravell Sep 10, 2025
25eccdb
don't hit disconnected servers when crawling endpoints
mgravell Sep 10, 2025
5841988
dotnet format
mgravell Sep 10, 2025
2aa9e63
more search test tweaks
mgravell Sep 10, 2025
9525f38
rev SE.Redis for RedisValue fix
mgravell Sep 10, 2025
aba8dda
more test fixes
mgravell Sep 10, 2025
41aab7e
dotnet format... again
mgravell Sep 10, 2025
ec6f1ea
fix routing of dictionary methods
mgravell Sep 10, 2025
3800073
actually: not a key
mgravell Sep 10, 2025
2dfbb60
try to add more replication stability
mgravell Sep 10, 2025
5d05a64
TestApplyAndFilterAggregations - loop attempt
mgravell Sep 10, 2025
d6e81aa
only continue on last attempt!
mgravell Sep 10, 2025
cec6e59
allow even more time in TestApplyAndFilterAggregations
mgravell Sep 11, 2025
6cef256
fix CI mstest on .net9
mgravell Sep 11, 2025
e68ba11
grandfather many cluster tests pre 8
mgravell Sep 11, 2025
d6f6020
update local docker file
mgravell Sep 11, 2025
c7097ee
skip a bunch more tests on cluster pre 8
mgravell Sep 11, 2025
cb9c8df
skip TestCreate on cluster < 8
mgravell Sep 11, 2025
1a494f0
Update tests/dockers/docker-compose.yml
mgravell Sep 17, 2025
e49b480
clarify that the enumerable APIs may involve multiple operations
mgravell Sep 17, 2025
5c61a7a
Merge remote-tracking branch 'origin/marc/cursor-read' into marc/curs…
mgravell Sep 17, 2025
59045a5
clarify why/when the old cursor API will fail
mgravell Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,8 @@ tests/NRedisStack.Tests/redis_credentials/redis_user.crt
# global.json
global.json
tests/NRedisStack.Tests/lcov.net8.0.info

# docker containers
tests/dockers/cluster/
tests/dockers/standalone/
tests/dockers/all/
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<!-- primary library -->
<PackageVersion Include="NetTopologySuite" Version="2.6.0" />
<PackageVersion Include="System.Text.Json" Version="9.0.7" />
<PackageVersion Include="StackExchange.Redis" Version="2.8.58" />
<PackageVersion Include="StackExchange.Redis" Version="2.9.17" />
<!-- tests, etc -->
<PackageVersion Include="BouncyCastle.Cryptography" Version="2.6.1" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
Expand Down
10 changes: 10 additions & 0 deletions src/NRedisStack/Auxiliary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@ public static RedisResult Execute(this IDatabase db, SerializedCommand command)
return db.Execute(command.Command, command.Args);
}

internal static RedisResult Execute(this IServer server, int? db, SerializedCommand command)
{
return server.Execute(db, command.Command, command.Args);
}

public static async Task<RedisResult> ExecuteAsync(this IDatabaseAsync db, SerializedCommand command)
{
((IDatabase)db).SetInfoInPipeline();
return await db.ExecuteAsync(command.Command, command.Args);
}

internal static async Task<RedisResult> ExecuteAsync(this IServer server, int? db, SerializedCommand command)
{
return await server.ExecuteAsync(db, command.Command, command.Args);
}

public static List<RedisResult> ExecuteBroadcast(this IDatabase db, string command)
=> db.ExecuteBroadcast(new SerializedCommand(command));

Expand Down
12 changes: 12 additions & 0 deletions src/NRedisStack/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,13 @@
#nullable enable
NRedisStack.ISearchCommands.AggregateEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IEnumerable<NRedisStack.Search.Aggregation.Row>!
NRedisStack.ISearchCommands.CursorDel(NRedisStack.Search.AggregationResult! result) -> bool
NRedisStack.ISearchCommands.CursorRead(NRedisStack.Search.AggregationResult! result, int? count = null) -> NRedisStack.Search.AggregationResult!
NRedisStack.ISearchCommandsAsync.AggregateAsyncEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IAsyncEnumerable<NRedisStack.Search.Aggregation.Row>!
NRedisStack.ISearchCommandsAsync.CursorDelAsync(NRedisStack.Search.AggregationResult! result) -> System.Threading.Tasks.Task<bool>!
NRedisStack.ISearchCommandsAsync.CursorReadAsync(NRedisStack.Search.AggregationResult! result, int? count = null) -> System.Threading.Tasks.Task<NRedisStack.Search.AggregationResult!>!
NRedisStack.SearchCommands.AggregateEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IEnumerable<NRedisStack.Search.Aggregation.Row>!
NRedisStack.SearchCommands.CursorDel(NRedisStack.Search.AggregationResult! result) -> bool
NRedisStack.SearchCommands.CursorRead(NRedisStack.Search.AggregationResult! result, int? count = null) -> NRedisStack.Search.AggregationResult!
NRedisStack.SearchCommandsAsync.AggregateAsyncEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IAsyncEnumerable<NRedisStack.Search.Aggregation.Row>!
NRedisStack.SearchCommandsAsync.CursorDelAsync(NRedisStack.Search.AggregationResult! result) -> System.Threading.Tasks.Task<bool>!
NRedisStack.SearchCommandsAsync.CursorReadAsync(NRedisStack.Search.AggregationResult! result, int? count = null) -> System.Threading.Tasks.Task<NRedisStack.Search.AggregationResult!>!
14 changes: 14 additions & 0 deletions src/NRedisStack/ResponseParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,20 @@ public static AggregationResult ToAggregationResult(this RedisResult result, Agg
}
}

internal static AggregationResult ToAggregationResult(this RedisResult result, string indexName, AggregationRequest query, IServer? server, int? database)
{
if (query.IsWithCursor())
{
var results = (RedisResult[])result!;

return new AggregationResult.WithCursorAggregationResult(indexName, results[0], (long)results[1], server, database);
}
else
{
return new(result);
}
}

public static Dictionary<string, RedisResult>[] ToDictionarys(this RedisResult result)
{
var resArr = (RedisResult[])result!;
Expand Down
2 changes: 2 additions & 0 deletions src/NRedisStack/Search/AggregationRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public AggregationRequest Cursor(int? count = null, long? maxIdle = null)

if (count != null)
{
Count = count;
args.Add(SearchArgs.COUNT);
args.Add(count);
}
Expand All @@ -139,6 +140,7 @@ public AggregationRequest Cursor(int? count = null, long? maxIdle = null)
}
return this;
}
internal int? Count { get; set; }

public AggregationRequest Params(Dictionary<string, object> nameValue)
{
Expand Down
19 changes: 16 additions & 3 deletions src/NRedisStack/Search/AggregationResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,29 @@

namespace NRedisStack.Search;

public sealed class AggregationResult
public class AggregationResult
{
// internal subclass for WITHCURSOR calls, which need to be issued to the same connection
internal sealed class WithCursorAggregationResult : AggregationResult
{
internal WithCursorAggregationResult(string indexName, RedisResult result, long cursorId, IServer? server,
int? database) : base(result, cursorId)
{
IndexName = indexName;
Server = server;
Database = database;
}
public string IndexName { get; }
public IServer? Server { get; }
public int? Database { get; }
}

public long TotalResults { get; }
private readonly Dictionary<string, object>[] _results;
private Dictionary<string, RedisValue>[]? _resultsAsRedisValues;

public long CursorId { get; }


internal AggregationResult(RedisResult result, long cursorId = -1)
{
var arr = (RedisResult[])result!;
Expand Down Expand Up @@ -45,7 +59,6 @@ internal AggregationResult(RedisResult result, long cursorId = -1)
CursorId = cursorId;
}


/// <summary>
/// takes a Redis multi-bulk array represented by a RedisResult[] and recursively processes its elements.
/// For each element in the array, it checks if it's another multi-bulk array, and if so, it recursively calls itself.
Expand Down
38 changes: 35 additions & 3 deletions src/NRedisStack/Search/ISearchCommands.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.ComponentModel;
using NRedisStack.Search;
using NRedisStack.Search.Aggregation;
using NRedisStack.Search.DataTypes;
using StackExchange.Redis;

Expand All @@ -18,11 +20,20 @@ public interface ISearchCommands
/// Run a search query on an index, and perform aggregate transformations on the results.
/// </summary>
/// <param name="index">The index name.</param>
/// <param name="query">The query</param>
/// <param name="query">The query.</param>
/// <returns>An <see langword="AggregationResult"/> object</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
AggregationResult Aggregate(string index, AggregationRequest query);

/// <summary>
/// Run a search query on an index, and perform aggregate transformations on the results.
/// </summary>
/// <param name="index">The index name.</param>
/// <param name="query">The query.</param>
/// <returns>A sequence of <see langword="Row"/> values.</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
IEnumerable<Row> AggregateEnumerable(string index, AggregationRequest query);

/// <summary>
/// Add an alias to an index.
/// </summary>
Expand Down Expand Up @@ -92,22 +103,43 @@ public interface ISearchCommands
/// <summary>
/// Delete a cursor from the index.
/// </summary>
/// <param name="indexName">The index name</param>
/// <param name="indexName">The index name.</param>
/// <param name="cursorId">The cursor's ID.</param>
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
[Obsolete("When possible, use CursorDel(AggregationResult) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
bool CursorDel(string indexName, long cursorId);

/// <summary>
/// Delete a cursor from the index.
/// </summary>
/// <param name="result">The result of a previous call to Aggregate or CursorRead.</param>
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
bool CursorDel(AggregationResult result);

/// <summary>
/// Read next results from an existing cursor.
/// </summary>
/// <param name="indexName">The index name</param>
/// <param name="indexName">The index name.</param>
/// <param name="cursorId">The cursor's ID.</param>
/// <param name="count">Limit the amount of returned results.</param>
/// <returns>A AggregationResult object with the results</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
[Obsolete("When possible, use AggregateEnumerable or CursorRead(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
AggregationResult CursorRead(string indexName, long cursorId, int? count = null);

/// <summary>
/// Read next results from an existing cursor.
/// </summary>
/// <param name="result">The result of a previous call to Aggregate or CursorRead.</param>
/// <param name="count">Limit the amount of returned results.</param>
/// <returns>A AggregationResult object with the results</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
public AggregationResult CursorRead(AggregationResult result, int? count = null);

/// <summary>
/// Add terms to a dictionary.
/// </summary>
Expand Down
36 changes: 35 additions & 1 deletion src/NRedisStack/Search/ISearchCommandsAsync.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.ComponentModel;
using NRedisStack.Search;
using NRedisStack.Search.Aggregation;
using NRedisStack.Search.DataTypes;
using StackExchange.Redis;

Expand All @@ -14,14 +16,25 @@ public interface ISearchCommandsAsync
Task<RedisResult[]> _ListAsync();

/// <summary>
/// Run a search query on an index, and perform aggregate transformations on the results.
/// Run a search query on an index, and perform aggregate transformations on the results. This operates
/// as a cursor and may involve multiple commands to the server.
/// </summary>
/// <param name="index">The index name.</param>
/// <param name="query">The query</param>
/// <returns>An <see langword="AggregationResult"/> object</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
Task<AggregationResult> AggregateAsync(string index, AggregationRequest query);

/// <summary>
/// Run a search query on an index, and perform aggregate transformations on the results. This operates
/// as a cursor and may involve multiple commands to the server.
/// </summary>
/// <param name="index">The index name.</param>
/// <param name="query">The query.</param>
/// <returns>A sequence of <see langword="Row"/> values.</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
IAsyncEnumerable<Row> AggregateAsyncEnumerable(string index, AggregationRequest query);

/// <summary>
/// Add an alias to an index.
/// </summary>
Expand Down Expand Up @@ -95,8 +108,18 @@ public interface ISearchCommandsAsync
/// <param name="cursorId">The cursor's ID.</param>
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
[Obsolete("When possible, use CursorDelAsync(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
Task<bool> CursorDelAsync(string indexName, long cursorId);

/// <summary>
/// Delete a cursor from the index.
/// </summary>
/// <param name="result">The result of a previous call to AggregateAsync or CursorReadAsync.</param>
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
Task<bool> CursorDelAsync(AggregationResult result);

/// <summary>
/// Read next results from an existing cursor.
/// </summary>
Expand All @@ -105,8 +128,19 @@ public interface ISearchCommandsAsync
/// <param name="count">Limit the amount of returned results.</param>
/// <returns>A AggregationResult object with the results</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
[Obsolete("When possible, use AggregateAsyncEnumerable or CursorReadAsync(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
Task<AggregationResult> CursorReadAsync(string indexName, long cursorId, int? count = null);

/// <summary>
/// Read next results from an existing cursor.
/// </summary>
/// <param name="result">The result of a previous AggregateAsync or CursorReadAsync call.</param>
/// <param name="count">Limit the amount of returned results.</param>
/// <returns>A AggregationResult object with the results</returns>
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
Task<AggregationResult> CursorReadAsync(AggregationResult result, int? count = null);

/// <summary>
/// Add terms to a dictionary.
/// </summary>
Expand Down
86 changes: 84 additions & 2 deletions src/NRedisStack/Search/SearchCommands.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.ComponentModel;
using NRedisStack.Search;
using NRedisStack.Search.Aggregation;
using NRedisStack.Search.DataTypes;
using StackExchange.Redis;
namespace NRedisStack;
Expand All @@ -16,8 +18,54 @@ public RedisResult[] _List()
public AggregationResult Aggregate(string index, AggregationRequest query)
{
SetDefaultDialectIfUnset(query);
var result = db.Execute(SearchCommandBuilder.Aggregate(index, query));
return result.ToAggregationResult(query);
IServer? server = null;
int? database = null;

var command = SearchCommandBuilder.Aggregate(index, query);
if (query.IsWithCursor())
{
// we can issue this anywhere, but follow-up calls need to be on the same server
server = GetRandomServerForCluster(db, out database);
}

RedisResult result;
if (server is not null)
{
result = server.Execute(database, command);
}
else
{
result = db.Execute(command);
}

return result.ToAggregationResult(index, query, server, database);
}

public IEnumerable<Row> AggregateEnumerable(string index, AggregationRequest query)
{
if (!query.IsWithCursor()) query.Cursor();

var result = Aggregate(index, query);
try
{
while (true)
{
var count = checked((int)result.TotalResults);
for (int i = 0; i < count; i++)
{
yield return result.GetRow(i);
}
if (result.CursorId == 0) break;
result = CursorRead(result, query.Count);
}
}
finally
{
if (result.CursorId != 0)
{
CursorDel(result);
}
}
}

/// <inheritdoc/>
Expand Down Expand Up @@ -72,18 +120,52 @@ public bool Create(string indexName, Schema schema)
}

/// <inheritdoc/>
[Obsolete("When possible, use CursorDel(AggregationResult) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
public bool CursorDel(string indexName, long cursorId)
{
return db.Execute(SearchCommandBuilder.CursorDel(indexName, cursorId)).OKtoBoolean();
}

public bool CursorDel(AggregationResult result)
{
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
{
throw new ArgumentException(
message: $"{nameof(CursorDelAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.",
paramName: nameof(result));
}

var command = SearchCommandBuilder.CursorDel(withCursor.IndexName, withCursor.CursorId);
var resp = withCursor.Server is { } server
? server.Execute(withCursor.Database, command)
: db.Execute(command);
return resp.OKtoBoolean();
}

/// <inheritdoc/>
[Obsolete("When possible, use CusorReadEnumerable or CursorRead(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
public AggregationResult CursorRead(string indexName, long cursorId, int? count = null)
{
var resp = db.Execute(SearchCommandBuilder.CursorRead(indexName, cursorId, count)).ToArray();
return new(resp[0], (long)resp[1]);
}

public AggregationResult CursorRead(AggregationResult result, int? count = null)
{
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
{
throw new ArgumentException(message: $"{nameof(CursorReadAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.", paramName: nameof(result));
}
var command = SearchCommandBuilder.CursorRead(withCursor.IndexName, withCursor.CursorId, count);
var rawResult = withCursor.Server is { } server
? server.Execute(withCursor.Database, command)
: db.Execute(command);
var resp = rawResult.ToArray();
return new AggregationResult.WithCursorAggregationResult(withCursor.IndexName, resp[0], (long)resp[1], withCursor.Server, withCursor.Database);
}

/// <inheritdoc/>
public long DictAdd(string dict, params string[] terms)
{
Expand Down
Loading
Loading