5656import org .junit .Before ;
5757
5858import java .util .ArrayList ;
59- import java .util .Arrays ;
6059import java .util .Collections ;
6160import java .util .HashMap ;
6261import java .util .List ;
6362import java .util .Map ;
6463import java .util .Optional ;
64+ import java .util .concurrent .CopyOnWriteArrayList ;
6565import java .util .concurrent .atomic .AtomicInteger ;
6666import java .util .stream .Collectors ;
6767import java .util .stream .Stream ;
6868
6969import static org .hamcrest .Matchers .equalTo ;
70+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
7071import static org .hamcrest .Matchers .instanceOf ;
7172import static org .hamcrest .Matchers .lessThanOrEqualTo ;
7273import static org .hamcrest .Matchers .not ;
7374
7475public class SearchPhaseControllerTests extends ESTestCase {
7576 private SearchPhaseController searchPhaseController ;
77+ private List <Boolean > reductions ;
7678
7779 @ Before
7880 public void setup () {
81+ reductions = new CopyOnWriteArrayList <>();
7982 searchPhaseController = new SearchPhaseController (
80- (b ) -> new InternalAggregation .ReduceContext (BigArrays .NON_RECYCLING_INSTANCE , null , b ));
83+ (finalReduce ) -> {
84+ reductions .add (finalReduce );
85+ return new InternalAggregation .ReduceContext (BigArrays .NON_RECYCLING_INSTANCE , null , finalReduce );
86+ });
8187 }
8288
8389 public void testSort () {
@@ -158,7 +164,7 @@ public void testMerge() {
158164 AtomicArray <SearchPhaseResult > queryResults = generateQueryResults (nShards , suggestions , queryResultSize , false );
159165 for (boolean trackTotalHits : new boolean [] {true , false }) {
160166 SearchPhaseController .ReducedQueryPhase reducedQueryPhase =
161- searchPhaseController .reducedQueryPhase (queryResults .asList (), false , trackTotalHits );
167+ searchPhaseController .reducedQueryPhase (queryResults .asList (), false , trackTotalHits , true );
162168 AtomicArray <SearchPhaseResult > fetchResults = generateFetchResults (nShards ,
163169 reducedQueryPhase .sortedTopDocs .scoreDocs , reducedQueryPhase .suggest );
164170 InternalSearchResponse mergedResponse = searchPhaseController .merge (false ,
@@ -308,14 +314,15 @@ private static AtomicArray<SearchPhaseResult> generateFetchResults(int nShards,
308314
309315 public void testConsumer () {
310316 int bufferSize = randomIntBetween (2 , 3 );
311- SearchRequest request = new SearchRequest ();
317+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
312318 request .source (new SearchSourceBuilder ().aggregation (AggregationBuilders .avg ("foo" )));
313319 request .setBatchedReduceSize (bufferSize );
314320 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer = searchPhaseController .newSearchPhaseResults (request , 3 );
321+ assertEquals (0 , reductions .size ());
315322 QuerySearchResult result = new QuerySearchResult (0 , new SearchShardTarget ("node" , new Index ("a" , "b" ), 0 , null ));
316323 result .topDocs (new TopDocsAndMaxScore (new TopDocs (new TotalHits (0 , TotalHits .Relation .EQUAL_TO ), new ScoreDoc [0 ]), Float .NaN ),
317324 new DocValueFormat [0 ]);
318- InternalAggregations aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , 1.0D , DocValueFormat .RAW ,
325+ InternalAggregations aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , 1.0D , DocValueFormat .RAW ,
319326 Collections .emptyList (), Collections .emptyMap ())));
320327 result .aggregations (aggs );
321328 result .setShardIndex (0 );
@@ -324,7 +331,7 @@ public void testConsumer() {
324331 result = new QuerySearchResult (1 , new SearchShardTarget ("node" , new Index ("a" , "b" ), 0 , null ));
325332 result .topDocs (new TopDocsAndMaxScore (new TopDocs (new TotalHits (0 , TotalHits .Relation .EQUAL_TO ), new ScoreDoc [0 ]), Float .NaN ),
326333 new DocValueFormat [0 ]);
327- aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , 3.0D , DocValueFormat .RAW ,
334+ aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , 3.0D , DocValueFormat .RAW ,
328335 Collections .emptyList (), Collections .emptyMap ())));
329336 result .aggregations (aggs );
330337 result .setShardIndex (2 );
@@ -333,23 +340,29 @@ public void testConsumer() {
333340 result = new QuerySearchResult (1 , new SearchShardTarget ("node" , new Index ("a" , "b" ), 0 , null ));
334341 result .topDocs (new TopDocsAndMaxScore (new TopDocs (new TotalHits (0 , TotalHits .Relation .EQUAL_TO ), new ScoreDoc [0 ]), Float .NaN ),
335342 new DocValueFormat [0 ]);
336- aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , 2.0D , DocValueFormat .RAW ,
343+ aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , 2.0D , DocValueFormat .RAW ,
337344 Collections .emptyList (), Collections .emptyMap ())));
338345 result .aggregations (aggs );
339346 result .setShardIndex (1 );
340347 consumer .consumeResult (result );
341- int numTotalReducePhases = 1 ;
348+ final int numTotalReducePhases ;
342349 if (bufferSize == 2 ) {
343350 assertThat (consumer , instanceOf (SearchPhaseController .QueryPhaseResultConsumer .class ));
344351 assertEquals (1 , ((SearchPhaseController .QueryPhaseResultConsumer )consumer ).getNumReducePhases ());
345352 assertEquals (2 , ((SearchPhaseController .QueryPhaseResultConsumer )consumer ).getNumBuffered ());
346- numTotalReducePhases ++;
353+ assertEquals (1 , reductions .size ());
354+ assertEquals (false , reductions .get (0 ));
355+ numTotalReducePhases = 2 ;
347356 } else {
348357 assertThat (consumer , not (instanceOf (SearchPhaseController .QueryPhaseResultConsumer .class )));
358+ assertEquals (0 , reductions .size ());
359+ numTotalReducePhases = 1 ;
349360 }
350361
351362 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
352363 assertEquals (numTotalReducePhases , reduce .numReducePhases );
364+ assertEquals (numTotalReducePhases , reductions .size ());
365+ assertFinalReduction (request );
353366 InternalMax max = (InternalMax ) reduce .aggregations .asList ().get (0 );
354367 assertEquals (3.0D , max .getValue (), 0.0D );
355368 assertFalse (reduce .sortedTopDocs .isSortedByField );
@@ -362,7 +375,7 @@ public void testConsumerConcurrently() throws InterruptedException {
362375 int expectedNumResults = randomIntBetween (1 , 100 );
363376 int bufferSize = randomIntBetween (2 , 200 );
364377
365- SearchRequest request = new SearchRequest ();
378+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
366379 request .source (new SearchSourceBuilder ().aggregation (AggregationBuilders .avg ("foo" )));
367380 request .setBatchedReduceSize (bufferSize );
368381 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -378,7 +391,7 @@ public void testConsumerConcurrently() throws InterruptedException {
378391 result .topDocs (new TopDocsAndMaxScore (
379392 new TopDocs (new TotalHits (1 , TotalHits .Relation .EQUAL_TO ), new ScoreDoc [] {new ScoreDoc (0 , number )}), number ),
380393 new DocValueFormat [0 ]);
381- InternalAggregations aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , (double ) number ,
394+ InternalAggregations aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , (double ) number ,
382395 DocValueFormat .RAW , Collections .emptyList (), Collections .emptyMap ())));
383396 result .aggregations (aggs );
384397 result .setShardIndex (id );
@@ -392,6 +405,7 @@ public void testConsumerConcurrently() throws InterruptedException {
392405 threads [i ].join ();
393406 }
394407 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
408+ assertFinalReduction (request );
395409 InternalMax internalMax = (InternalMax ) reduce .aggregations .asList ().get (0 );
396410 assertEquals (max .get (), internalMax .getValue (), 0.0D );
397411 assertEquals (1 , reduce .sortedTopDocs .scoreDocs .length );
@@ -407,7 +421,7 @@ public void testConsumerConcurrently() throws InterruptedException {
407421 public void testConsumerOnlyAggs () {
408422 int expectedNumResults = randomIntBetween (1 , 100 );
409423 int bufferSize = randomIntBetween (2 , 200 );
410- SearchRequest request = new SearchRequest ();
424+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
411425 request .source (new SearchSourceBuilder ().aggregation (AggregationBuilders .avg ("foo" )).size (0 ));
412426 request .setBatchedReduceSize (bufferSize );
413427 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -419,14 +433,15 @@ public void testConsumerOnlyAggs() {
419433 QuerySearchResult result = new QuerySearchResult (i , new SearchShardTarget ("node" , new Index ("a" , "b" ), i , null ));
420434 result .topDocs (new TopDocsAndMaxScore (new TopDocs (new TotalHits (1 , TotalHits .Relation .EQUAL_TO ), new ScoreDoc [0 ]), number ),
421435 new DocValueFormat [0 ]);
422- InternalAggregations aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , (double ) number ,
436+ InternalAggregations aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , (double ) number ,
423437 DocValueFormat .RAW , Collections .emptyList (), Collections .emptyMap ())));
424438 result .aggregations (aggs );
425439 result .setShardIndex (i );
426440 result .size (1 );
427441 consumer .consumeResult (result );
428442 }
429443 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
444+ assertFinalReduction (request );
430445 InternalMax internalMax = (InternalMax ) reduce .aggregations .asList ().get (0 );
431446 assertEquals (max .get (), internalMax .getValue (), 0.0D );
432447 assertEquals (0 , reduce .sortedTopDocs .scoreDocs .length );
@@ -441,7 +456,7 @@ public void testConsumerOnlyAggs() {
441456 public void testConsumerOnlyHits () {
442457 int expectedNumResults = randomIntBetween (1 , 100 );
443458 int bufferSize = randomIntBetween (2 , 200 );
444- SearchRequest request = new SearchRequest ();
459+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
445460 if (randomBoolean ()) {
446461 request .source (new SearchSourceBuilder ().size (randomIntBetween (1 , 10 )));
447462 }
@@ -460,6 +475,7 @@ public void testConsumerOnlyHits() {
460475 consumer .consumeResult (result );
461476 }
462477 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
478+ assertFinalReduction (request );
463479 assertEquals (1 , reduce .sortedTopDocs .scoreDocs .length );
464480 assertEquals (max .get (), reduce .maxScore , 0.0f );
465481 assertEquals (expectedNumResults , reduce .totalHits .value );
@@ -470,6 +486,12 @@ public void testConsumerOnlyHits() {
470486 assertNull (reduce .sortedTopDocs .collapseValues );
471487 }
472488
489+ private void assertFinalReduction (SearchRequest searchRequest ) {
490+ assertThat (reductions .size (), greaterThanOrEqualTo (1 ));
491+ //the last reduction step was the final one only if no cluster alias was provided with the search request
492+ assertEquals (searchRequest .getLocalClusterAlias () == null , reductions .get (reductions .size () - 1 ));
493+ }
494+
473495 public void testNewSearchPhaseResults () {
474496 for (int i = 0 ; i < 10 ; i ++) {
475497 int expectedNumResults = randomIntBetween (1 , 10 );
@@ -540,7 +562,7 @@ public void testReduceTopNWithFromOffset() {
540562 public void testConsumerSortByField () {
541563 int expectedNumResults = randomIntBetween (1 , 100 );
542564 int bufferSize = randomIntBetween (2 , 200 );
543- SearchRequest request = new SearchRequest ();
565+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
544566 int size = randomIntBetween (1 , 10 );
545567 request .setBatchedReduceSize (bufferSize );
546568 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -560,6 +582,7 @@ public void testConsumerSortByField() {
560582 consumer .consumeResult (result );
561583 }
562584 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
585+ assertFinalReduction (request );
563586 assertEquals (Math .min (expectedNumResults , size ), reduce .sortedTopDocs .scoreDocs .length );
564587 assertEquals (expectedNumResults , reduce .totalHits .value );
565588 assertEquals (max .get (), ((FieldDoc )reduce .sortedTopDocs .scoreDocs [0 ]).fields [0 ]);
@@ -574,7 +597,7 @@ public void testConsumerSortByField() {
574597 public void testConsumerFieldCollapsing () {
575598 int expectedNumResults = randomIntBetween (30 , 100 );
576599 int bufferSize = randomIntBetween (2 , 200 );
577- SearchRequest request = new SearchRequest ();
600+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
578601 int size = randomIntBetween (5 , 10 );
579602 request .setBatchedReduceSize (bufferSize );
580603 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -596,6 +619,7 @@ public void testConsumerFieldCollapsing() {
596619 consumer .consumeResult (result );
597620 }
598621 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
622+ assertFinalReduction (request );
599623 assertEquals (3 , reduce .sortedTopDocs .scoreDocs .length );
600624 assertEquals (expectedNumResults , reduce .totalHits .value );
601625 assertEquals (a , ((FieldDoc )reduce .sortedTopDocs .scoreDocs [0 ]).fields [0 ]);
0 commit comments