5353import org .junit .Before ;
5454
5555import java .util .ArrayList ;
56- import java .util .Arrays ;
5756import java .util .Collections ;
5857import java .util .HashMap ;
5958import java .util .List ;
6059import java .util .Map ;
6160import java .util .Optional ;
61+ import java .util .concurrent .CopyOnWriteArrayList ;
6262import java .util .concurrent .atomic .AtomicInteger ;
6363import java .util .stream .Collectors ;
6464import java .util .stream .Stream ;
6565
6666import static org .hamcrest .Matchers .equalTo ;
67+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
6768import static org .hamcrest .Matchers .instanceOf ;
6869import static org .hamcrest .Matchers .lessThanOrEqualTo ;
6970import static org .hamcrest .Matchers .not ;
7071
7172public class SearchPhaseControllerTests extends ESTestCase {
7273 private SearchPhaseController searchPhaseController ;
74+ private List <Boolean > reductions ;
7375
7476 @ Before
7577 public void setup () {
78+ reductions = new CopyOnWriteArrayList <>();
7679 searchPhaseController = new SearchPhaseController (
77- (b ) -> new InternalAggregation .ReduceContext (BigArrays .NON_RECYCLING_INSTANCE , null , b ));
80+ (finalReduce ) -> {
81+ reductions .add (finalReduce );
82+ return new InternalAggregation .ReduceContext (BigArrays .NON_RECYCLING_INSTANCE , null , finalReduce );
83+ });
7884 }
7985
8086 public void testSort () {
@@ -154,7 +160,7 @@ public void testMerge() {
154160 AtomicArray <SearchPhaseResult > queryResults = generateQueryResults (nShards , suggestions , queryResultSize , false );
155161 for (boolean trackTotalHits : new boolean [] {true , false }) {
156162 SearchPhaseController .ReducedQueryPhase reducedQueryPhase =
157- searchPhaseController .reducedQueryPhase (queryResults .asList (), false , trackTotalHits );
163+ searchPhaseController .reducedQueryPhase (queryResults .asList (), false , trackTotalHits , true );
158164 AtomicArray <SearchPhaseResult > fetchResults = generateFetchResults (nShards ,
159165 reducedQueryPhase .sortedTopDocs .scoreDocs , reducedQueryPhase .suggest );
160166 InternalSearchResponse mergedResponse = searchPhaseController .merge (false ,
@@ -302,45 +308,52 @@ private static AtomicArray<SearchPhaseResult> generateFetchResults(int nShards,
302308
303309 public void testConsumer () {
304310 int bufferSize = randomIntBetween (2 , 3 );
305- SearchRequest request = new SearchRequest ();
311+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
306312 request .source (new SearchSourceBuilder ().aggregation (AggregationBuilders .avg ("foo" )));
307313 request .setBatchedReduceSize (bufferSize );
308314 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer = searchPhaseController .newSearchPhaseResults (request , 3 );
315+ assertEquals (0 , reductions .size ());
309316 QuerySearchResult result = new QuerySearchResult (0 , new SearchShardTarget ("node" , new Index ("a" , "b" ), 0 , null ));
310317 result .topDocs (new TopDocs (0 , new ScoreDoc [0 ], 0.0F ), new DocValueFormat [0 ]);
311- InternalAggregations aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , 1.0D , DocValueFormat .RAW ,
318+ InternalAggregations aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , 1.0D , DocValueFormat .RAW ,
312319 Collections .emptyList (), Collections .emptyMap ())));
313320 result .aggregations (aggs );
314321 result .setShardIndex (0 );
315322 consumer .consumeResult (result );
316323
317324 result = new QuerySearchResult (1 , new SearchShardTarget ("node" , new Index ("a" , "b" ), 0 , null ));
318325 result .topDocs (new TopDocs (0 , new ScoreDoc [0 ], 0.0F ), new DocValueFormat [0 ]);
319- aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , 3.0D , DocValueFormat .RAW ,
326+ aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , 3.0D , DocValueFormat .RAW ,
320327 Collections .emptyList (), Collections .emptyMap ())));
321328 result .aggregations (aggs );
322329 result .setShardIndex (2 );
323330 consumer .consumeResult (result );
324331
325332 result = new QuerySearchResult (1 , new SearchShardTarget ("node" , new Index ("a" , "b" ), 0 , null ));
326333 result .topDocs (new TopDocs (0 , new ScoreDoc [0 ], 0.0F ), new DocValueFormat [0 ]);
327- aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , 2.0D , DocValueFormat .RAW ,
334+ aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , 2.0D , DocValueFormat .RAW ,
328335 Collections .emptyList (), Collections .emptyMap ())));
329336 result .aggregations (aggs );
330337 result .setShardIndex (1 );
331338 consumer .consumeResult (result );
332- int numTotalReducePhases = 1 ;
339+ final int numTotalReducePhases ;
333340 if (bufferSize == 2 ) {
334341 assertThat (consumer , instanceOf (SearchPhaseController .QueryPhaseResultConsumer .class ));
335342 assertEquals (1 , ((SearchPhaseController .QueryPhaseResultConsumer )consumer ).getNumReducePhases ());
336343 assertEquals (2 , ((SearchPhaseController .QueryPhaseResultConsumer )consumer ).getNumBuffered ());
337- numTotalReducePhases ++;
344+ assertEquals (1 , reductions .size ());
345+ assertEquals (false , reductions .get (0 ));
346+ numTotalReducePhases = 2 ;
338347 } else {
339348 assertThat (consumer , not (instanceOf (SearchPhaseController .QueryPhaseResultConsumer .class )));
349+ assertEquals (0 , reductions .size ());
350+ numTotalReducePhases = 1 ;
340351 }
341352
342353 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
343354 assertEquals (numTotalReducePhases , reduce .numReducePhases );
355+ assertEquals (numTotalReducePhases , reductions .size ());
356+ assertFinalReduction (request );
344357 InternalMax max = (InternalMax ) reduce .aggregations .asList ().get (0 );
345358 assertEquals (3.0D , max .getValue (), 0.0D );
346359 assertFalse (reduce .sortedTopDocs .isSortedByField );
@@ -353,7 +366,7 @@ public void testConsumerConcurrently() throws InterruptedException {
353366 int expectedNumResults = randomIntBetween (1 , 100 );
354367 int bufferSize = randomIntBetween (2 , 200 );
355368
356- SearchRequest request = new SearchRequest ();
369+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
357370 request .source (new SearchSourceBuilder ().aggregation (AggregationBuilders .avg ("foo" )));
358371 request .setBatchedReduceSize (bufferSize );
359372 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -367,7 +380,7 @@ public void testConsumerConcurrently() throws InterruptedException {
367380 max .updateAndGet (prev -> Math .max (prev , number ));
368381 QuerySearchResult result = new QuerySearchResult (id , new SearchShardTarget ("node" , new Index ("a" , "b" ), id , null ));
369382 result .topDocs (new TopDocs (1 , new ScoreDoc [] {new ScoreDoc (0 , number )}, number ), new DocValueFormat [0 ]);
370- InternalAggregations aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , (double ) number ,
383+ InternalAggregations aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , (double ) number ,
371384 DocValueFormat .RAW , Collections .emptyList (), Collections .emptyMap ())));
372385 result .aggregations (aggs );
373386 result .setShardIndex (id );
@@ -381,6 +394,7 @@ public void testConsumerConcurrently() throws InterruptedException {
381394 threads [i ].join ();
382395 }
383396 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
397+ assertFinalReduction (request );
384398 InternalMax internalMax = (InternalMax ) reduce .aggregations .asList ().get (0 );
385399 assertEquals (max .get (), internalMax .getValue (), 0.0D );
386400 assertEquals (1 , reduce .sortedTopDocs .scoreDocs .length );
@@ -396,7 +410,7 @@ public void testConsumerConcurrently() throws InterruptedException {
396410 public void testConsumerOnlyAggs () {
397411 int expectedNumResults = randomIntBetween (1 , 100 );
398412 int bufferSize = randomIntBetween (2 , 200 );
399- SearchRequest request = new SearchRequest ();
413+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
400414 request .source (new SearchSourceBuilder ().aggregation (AggregationBuilders .avg ("foo" )).size (0 ));
401415 request .setBatchedReduceSize (bufferSize );
402416 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -407,14 +421,15 @@ public void testConsumerOnlyAggs() {
407421 max .updateAndGet (prev -> Math .max (prev , number ));
408422 QuerySearchResult result = new QuerySearchResult (i , new SearchShardTarget ("node" , new Index ("a" , "b" ), i , null ));
409423 result .topDocs (new TopDocs (1 , new ScoreDoc [0 ], number ), new DocValueFormat [0 ]);
410- InternalAggregations aggs = new InternalAggregations (Arrays . asList (new InternalMax ("test" , (double ) number ,
424+ InternalAggregations aggs = new InternalAggregations (Collections . singletonList (new InternalMax ("test" , (double ) number ,
411425 DocValueFormat .RAW , Collections .emptyList (), Collections .emptyMap ())));
412426 result .aggregations (aggs );
413427 result .setShardIndex (i );
414428 result .size (1 );
415429 consumer .consumeResult (result );
416430 }
417431 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
432+ assertFinalReduction (request );
418433 InternalMax internalMax = (InternalMax ) reduce .aggregations .asList ().get (0 );
419434 assertEquals (max .get (), internalMax .getValue (), 0.0D );
420435 assertEquals (0 , reduce .sortedTopDocs .scoreDocs .length );
@@ -429,7 +444,7 @@ public void testConsumerOnlyAggs() {
429444 public void testConsumerOnlyHits () {
430445 int expectedNumResults = randomIntBetween (1 , 100 );
431446 int bufferSize = randomIntBetween (2 , 200 );
432- SearchRequest request = new SearchRequest ();
447+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
433448 if (randomBoolean ()) {
434449 request .source (new SearchSourceBuilder ().size (randomIntBetween (1 , 10 )));
435450 }
@@ -447,6 +462,7 @@ public void testConsumerOnlyHits() {
447462 consumer .consumeResult (result );
448463 }
449464 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
465+ assertFinalReduction (request );
450466 assertEquals (1 , reduce .sortedTopDocs .scoreDocs .length );
451467 assertEquals (max .get (), reduce .maxScore , 0.0f );
452468 assertEquals (expectedNumResults , reduce .totalHits );
@@ -457,6 +473,12 @@ public void testConsumerOnlyHits() {
457473 assertNull (reduce .sortedTopDocs .collapseValues );
458474 }
459475
476+ private void assertFinalReduction (SearchRequest searchRequest ) {
477+ assertThat (reductions .size (), greaterThanOrEqualTo (1 ));
478+ //the last reduction step was the final one only if no cluster alias was provided with the search request
479+ assertEquals (searchRequest .getLocalClusterAlias () == null , reductions .get (reductions .size () - 1 ));
480+ }
481+
460482 public void testNewSearchPhaseResults () {
461483 for (int i = 0 ; i < 10 ; i ++) {
462484 int expectedNumResults = randomIntBetween (1 , 10 );
@@ -526,7 +548,7 @@ public void testReduceTopNWithFromOffset() {
526548 public void testConsumerSortByField () {
527549 int expectedNumResults = randomIntBetween (1 , 100 );
528550 int bufferSize = randomIntBetween (2 , 200 );
529- SearchRequest request = new SearchRequest ();
551+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
530552 int size = randomIntBetween (1 , 10 );
531553 request .setBatchedReduceSize (bufferSize );
532554 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -546,6 +568,7 @@ public void testConsumerSortByField() {
546568 consumer .consumeResult (result );
547569 }
548570 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
571+ assertFinalReduction (request );
549572 assertEquals (Math .min (expectedNumResults , size ), reduce .sortedTopDocs .scoreDocs .length );
550573 assertEquals (expectedNumResults , reduce .totalHits );
551574 assertEquals (max .get (), ((FieldDoc )reduce .sortedTopDocs .scoreDocs [0 ]).fields [0 ]);
@@ -560,7 +583,7 @@ public void testConsumerSortByField() {
560583 public void testConsumerFieldCollapsing () {
561584 int expectedNumResults = randomIntBetween (30 , 100 );
562585 int bufferSize = randomIntBetween (2 , 200 );
563- SearchRequest request = new SearchRequest ();
586+ SearchRequest request = randomBoolean () ? new SearchRequest () : new SearchRequest ( "remote" );
564587 int size = randomIntBetween (5 , 10 );
565588 request .setBatchedReduceSize (bufferSize );
566589 InitialSearchPhase .ArraySearchPhaseResults <SearchPhaseResult > consumer =
@@ -582,6 +605,7 @@ public void testConsumerFieldCollapsing() {
582605 consumer .consumeResult (result );
583606 }
584607 SearchPhaseController .ReducedQueryPhase reduce = consumer .reduce ();
608+ assertFinalReduction (request );
585609 assertEquals (3 , reduce .sortedTopDocs .scoreDocs .length );
586610 assertEquals (expectedNumResults , reduce .totalHits );
587611 assertEquals (a , ((FieldDoc )reduce .sortedTopDocs .scoreDocs [0 ]).fields [0 ]);
0 commit comments