Skip to content

Commit c6d31ec

Browse files
committed
Fix wrong aggregation arguments order
1 parent 75dbb2d commit c6d31ec

File tree

4 files changed

+661
-81
lines changed

4 files changed

+661
-81
lines changed

src/main/java/io/lettuce/core/search/arguments/AggregateArgs.java

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,11 @@ public class AggregateArgs<K, V> {
8686

8787
private Optional<Duration> timeout = Optional.empty();
8888

89-
private final List<GroupBy<K, V>> groupByList = new ArrayList<>();
90-
91-
private final List<SortBy<K>> sortByList = new ArrayList<>();
92-
93-
private final List<Apply<K, V>> applyList = new ArrayList<>();
94-
95-
private Optional<Limit> limit = Optional.empty();
96-
97-
private final List<V> filters = new ArrayList<>();
89+
/**
90+
* Ordered list of pipeline operations (GROUPBY, SORTBY, APPLY, FILTER). These operations must be applied in the order
91+
* specified by the user.
92+
*/
93+
private final List<PipelineOperation<K, ?>> pipelineOperations = new ArrayList<>();
9894

9995
private Optional<WithCursor> withCursor = Optional.empty();
10096

@@ -214,7 +210,7 @@ public Builder<K, V> timeout(Duration timeout) {
214210
* @return the builder.
215211
*/
216212
public Builder<K, V> groupBy(GroupBy<K, V> groupBy) {
217-
args.groupByList.add(groupBy);
213+
args.pipelineOperations.add(groupBy);
218214
return this;
219215
}
220216

@@ -225,7 +221,7 @@ public Builder<K, V> groupBy(GroupBy<K, V> groupBy) {
225221
* @return the builder.
226222
*/
227223
public Builder<K, V> sortBy(SortBy<K> sortBy) {
228-
args.sortByList.add(sortBy);
224+
args.pipelineOperations.add(sortBy);
229225
return this;
230226
}
231227

@@ -236,7 +232,7 @@ public Builder<K, V> sortBy(SortBy<K> sortBy) {
236232
* @return the builder.
237233
*/
238234
public Builder<K, V> apply(Apply<K, V> apply) {
239-
args.applyList.add(apply);
235+
args.pipelineOperations.add(apply);
240236
return this;
241237
}
242238

@@ -268,7 +264,7 @@ public Builder<K, V> apply(Apply<K, V> apply) {
268264
* @return the builder.
269265
*/
270266
public Builder<K, V> limit(long offset, long num) {
271-
args.limit = Optional.of(new Limit(offset, num));
267+
args.pipelineOperations.add(new Limit<>(offset, num));
272268
return this;
273269
}
274270

@@ -302,7 +298,7 @@ public Builder<K, V> limit(long offset, long num) {
302298
* @return the builder.
303299
*/
304300
public Builder<K, V> filter(V filter) {
305-
args.filters.add(filter);
301+
args.pipelineOperations.add(new Filter<>(filter));
306302
return this;
307303
}
308304

@@ -513,32 +509,12 @@ public void build(CommandArgs<K, V> args) {
513509
args.add(t.toMillis());
514510
});
515511

516-
// Add GROUPBY clauses
517-
for (GroupBy<K, V> groupBy : groupByList) {
518-
groupBy.build(args);
519-
}
520-
521-
// Add SORTBY clauses
522-
for (SortBy<K> sortBy : sortByList) {
523-
sortBy.build(args);
524-
}
525-
526-
// Add APPLY clauses
527-
for (Apply<K, V> apply : applyList) {
528-
apply.build(args);
529-
}
530-
531-
// Add LIMIT clause
532-
limit.ifPresent(l -> {
533-
args.add(CommandKeyword.LIMIT);
534-
args.add(l.offset);
535-
args.add(l.num);
536-
});
537-
538-
// Add FILTER clauses
539-
for (V filter : filters) {
540-
args.add(CommandKeyword.FILTER);
541-
args.addValue(filter);
512+
// Add pipeline operations in user-specified order
513+
for (PipelineOperation<K, ?> operation : pipelineOperations) {
514+
// Cast is safe because all operations can build with CommandArgs<K, V>
515+
@SuppressWarnings("unchecked")
516+
PipelineOperation<K, V> typedOperation = (PipelineOperation<K, V>) operation;
517+
typedOperation.build(args);
542518
}
543519

544520
// Add WITHCURSOR clause
@@ -578,6 +554,21 @@ public Optional<WithCursor> getWithCursor() {
578554
return withCursor;
579555
}
580556

557+
/**
558+
* Interface for pipeline operations that need to be applied in user-specified order. This includes GROUPBY, SORTBY, APPLY,
559+
* and FILTER operations.
560+
*/
561+
public interface PipelineOperation<K, V> {
562+
563+
/**
564+
* Build the operation arguments into the command args.
565+
*
566+
* @param args the command args to build into
567+
*/
568+
void build(CommandArgs<K, V> args);
569+
570+
}
571+
581572
// Helper classes
582573
public static class LoadField<K> {
583574

@@ -592,7 +583,7 @@ public static class LoadField<K> {
592583

593584
}
594585

595-
public static class Limit {
586+
public static class Limit<K, V> implements PipelineOperation<K, V> {
596587

597588
final long offset;
598589

@@ -603,6 +594,13 @@ public static class Limit {
603594
this.num = num;
604595
}
605596

597+
@Override
598+
public void build(CommandArgs<K, V> args) {
599+
args.add(CommandKeyword.LIMIT);
600+
args.add(offset);
601+
args.add(num);
602+
}
603+
606604
}
607605

608606
public static class WithCursor {
@@ -676,7 +674,7 @@ public static WithCursor of(Long count) {
676674
* performance.
677675
* </p>
678676
*/
679-
public static class GroupBy<K, V> {
677+
public static class GroupBy<K, V> implements PipelineOperation<K, V> {
680678

681679
private final List<K> properties;
682680

@@ -705,6 +703,7 @@ public static <K, V> GroupBy<K, V> of(K... properties) {
705703
return new GroupBy<>(Arrays.asList(properties));
706704
}
707705

706+
@Override
708707
public void build(CommandArgs<K, V> args) {
709708
args.add(CommandKeyword.GROUPBY);
710709
args.add(properties.size());
@@ -764,7 +763,7 @@ public void build(CommandArgs<K, V> args) {
764763
* using LIMIT.
765764
* </p>
766765
*/
767-
public static class SortBy<K> {
766+
public static class SortBy<K> implements PipelineOperation<K, Object> {
768767

769768
private final List<SortProperty<K>> properties;
770769

@@ -810,7 +809,8 @@ public static <K> SortBy<K> of(SortProperty<K>... properties) {
810809
return new SortBy<>(Arrays.asList(properties));
811810
}
812811

813-
public <V> void build(CommandArgs<K, V> args) {
812+
@Override
813+
public void build(CommandArgs<K, Object> args) {
814814
args.add(CommandKeyword.SORTBY);
815815
// Count includes property + direction pairs
816816
args.add(properties.size() * 2L);
@@ -880,7 +880,7 @@ public <V> void build(CommandArgs<K, V> args) {
880880
* can be referenced by further operations.
881881
* </p>
882882
*/
883-
public static class Apply<K, V> {
883+
public static class Apply<K, V> implements PipelineOperation<K, V> {
884884

885885
private final V expression;
886886

@@ -891,6 +891,7 @@ public Apply(V expression, K name) {
891891
this.name = name;
892892
}
893893

894+
@Override
894895
public void build(CommandArgs<K, V> args) {
895896
args.add(CommandKeyword.APPLY);
896897
args.addValue(expression);
@@ -1065,6 +1066,31 @@ public void build(CommandArgs<K, V> args) {
10651066

10661067
}
10671068

1069+
/**
1070+
* Represents a FILTER clause in an aggregation pipeline.
1071+
*
1072+
* <p>
1073+
* Filters the results using predicate expressions relating to values in each result. Filters are applied after the query
1074+
* and relate to the current state of the pipeline. This allows filtering on computed fields created by APPLY operations or
1075+
* reducer results.
1076+
* </p>
1077+
*/
1078+
public static class Filter<K, V> implements PipelineOperation<K, V> {
1079+
1080+
private final V expression;
1081+
1082+
public Filter(V expression) {
1083+
this.expression = expression;
1084+
}
1085+
1086+
@Override
1087+
public void build(CommandArgs<K, V> args) {
1088+
args.add(CommandKeyword.FILTER);
1089+
args.addValue(expression);
1090+
}
1091+
1092+
}
1093+
10681094
/**
10691095
* Represents a sort property with direction.
10701096
*/

src/test/java/io/lettuce/core/RediSearchCommandBuilderUnitTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,39 @@ void shouldCorrectlyConstructFtAggregateCommandBasic() {
635635
assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo(result);
636636
}
637637

638+
@Test
639+
void shouldMaintainPipelineOperationOrder() {
640+
// Test that pipeline operations (GROUPBY, SORTBY, APPLY, FILTER, LIMIT)
641+
// are output in the order specified by the user, not in a fixed order
642+
AggregateArgs<String, String> aggregateArgs = AggregateArgs.<String, String> builder()//
643+
.apply("@price * @quantity", "total_value")// First operation
644+
.filter("@total_value > 100")// Second operation
645+
.groupBy(GroupBy.<String, String> of("category").reduce(Reducer.<String, String> count().as("count")))// Third
646+
// operation
647+
.limit(0, 5)// Fourth operation
648+
.sortBy(SortBy.of("count", SortDirection.DESC))// Fifth operation
649+
.build();
650+
651+
Command<String, String, AggregationReply<String, String>> command = builder.ftAggregate(MY_KEY, MY_QUERY,
652+
aggregateArgs);
653+
ByteBuf buf = Unpooled.directBuffer();
654+
command.encode(buf);
655+
656+
// Expected order should match the user's call order: APPLY -> FILTER -> GROUPBY -> LIMIT -> SORTBY
657+
String result = "*26\r\n" + "$12\r\n" + "FT.AGGREGATE\r\n" + "$3\r\n" + "idx\r\n" + "$1\r\n" + "*\r\n"//
658+
+ "$5\r\n" + "APPLY\r\n" + "$18\r\n" + "@price * @quantity\r\n" + "$2\r\n" + "AS\r\n" + "$11\r\n"
659+
+ "total_value\r\n"//
660+
+ "$6\r\n" + "FILTER\r\n" + "$18\r\n" + "@total_value > 100\r\n"//
661+
+ "$7\r\n" + "GROUPBY\r\n" + "$1\r\n" + "1\r\n" + "$9\r\n" + "@category\r\n"//
662+
+ "$6\r\n" + "REDUCE\r\n" + "$5\r\n" + "COUNT\r\n" + "$1\r\n" + "0\r\n" + "$2\r\n" + "AS\r\n" + "$5\r\n"
663+
+ "count\r\n"//
664+
+ "$5\r\n" + "LIMIT\r\n" + "$1\r\n" + "0\r\n" + "$1\r\n" + "5\r\n"//
665+
+ "$6\r\n" + "SORTBY\r\n" + "$1\r\n" + "2\r\n" + "$6\r\n" + "@count\r\n" + "$4\r\n" + "DESC\r\n"//
666+
+ "$7\r\n" + "DIALECT\r\n" + "$1\r\n2\r\n";//
667+
668+
assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo(result);
669+
}
670+
638671
@Test
639672
void shouldCorrectlyConstructFtAggregateCommandWithArgs() {
640673
AggregateArgs<String, String> aggregateArgs = AggregateArgs.<String, String> builder()//

0 commit comments

Comments
 (0)