@@ -571,19 +571,23 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
571571 private final int bufferSize ;
572572 private int index ;
573573 private final SearchPhaseController controller ;
574+ private final SearchProgressListener progressListener ;
574575 private int numReducePhases = 0 ;
575576 private final TopDocsStats topDocsStats ;
576577 private final boolean performFinalReduce ;
577578
578579 /**
579580 * Creates a new {@link QueryPhaseResultConsumer}
581+ * @param progressListener a progress listener to be notified when a successful response is received
582+ * and when a partial or final reduce has completed.
580583 * @param controller a controller instance to reduce the query response objects
581584 * @param expectedResultSize the expected number of query results. Corresponds to the number of shards queried
582585 * @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
583586 * the buffer is used to incrementally reduce aggregation results before all shards responded.
584587 */
585- private QueryPhaseResultConsumer (SearchPhaseController controller , int expectedResultSize , int bufferSize ,
586- boolean hasTopDocs , boolean hasAggs , int trackTotalHitsUpTo , boolean performFinalReduce ) {
588+ private QueryPhaseResultConsumer (SearchProgressListener progressListener , SearchPhaseController controller ,
589+ int expectedResultSize , int bufferSize , boolean hasTopDocs , boolean hasAggs ,
590+ int trackTotalHitsUpTo , boolean performFinalReduce ) {
587591 super (expectedResultSize );
588592 if (expectedResultSize != 1 && bufferSize < 2 ) {
589593 throw new IllegalArgumentException ("buffer size must be >= 2 if there is more than one expected result" );
@@ -595,6 +599,7 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
595599 throw new IllegalArgumentException ("either aggs or top docs must be present" );
596600 }
597601 this .controller = controller ;
602+ this .progressListener = progressListener ;
598603 // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
599604 this .aggsBuffer = new InternalAggregations [hasAggs ? bufferSize : 0 ];
600605 this .topDocsBuffer = new TopDocs [hasTopDocs ? bufferSize : 0 ];
@@ -610,6 +615,7 @@ public void consumeResult(SearchPhaseResult result) {
610615 super .consumeResult (result );
611616 QuerySearchResult queryResult = result .queryResult ();
612617 consumeInternal (queryResult );
618+ progressListener .notifyQueryResult (queryResult .getShardIndex ());
613619 }
614620
615621 private synchronized void consumeInternal (QuerySearchResult querySearchResult ) {
@@ -629,6 +635,10 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
629635 }
630636 numReducePhases ++;
631637 index = 1 ;
638+ if (hasAggs ) {
639+ progressListener .notifyPartialReduce (progressListener .searchShards (results .asList ()),
640+ topDocsStats .getTotalHits (), aggsBuffer [0 ], numReducePhases );
641+ }
632642 }
633643 final int i = index ++;
634644 if (hasAggs ) {
@@ -652,8 +662,11 @@ private synchronized List<TopDocs> getRemainingTopDocs() {
652662
653663 @ Override
654664 public ReducedQueryPhase reduce () {
655- return controller .reducedQueryPhase (results .asList (), getRemainingAggs (), getRemainingTopDocs (), topDocsStats ,
656- numReducePhases , false , performFinalReduce );
665+ ReducedQueryPhase reducePhase = controller .reducedQueryPhase (results .asList (),
666+ getRemainingAggs (), getRemainingTopDocs (), topDocsStats , numReducePhases , false , performFinalReduce );
667+ progressListener .notifyReduce (progressListener .searchShards (results .asList ()),
668+ reducePhase .totalHits , reducePhase .aggregations );
669+ return reducePhase ;
657670 }
658671
659672 /**
@@ -678,7 +691,9 @@ private int resolveTrackTotalHits(SearchRequest request) {
678691 /**
679692 * Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
680693 */
681- ArraySearchPhaseResults <SearchPhaseResult > newSearchPhaseResults (SearchRequest request , int numShards ) {
694+ ArraySearchPhaseResults <SearchPhaseResult > newSearchPhaseResults (SearchProgressListener listener ,
695+ SearchRequest request ,
696+ int numShards ) {
682697 SearchSourceBuilder source = request .source ();
683698 boolean isScrollRequest = request .scroll () != null ;
684699 final boolean hasAggs = source != null && source .aggregations () != null ;
@@ -688,14 +703,24 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest r
688703 // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
689704 if (request .getBatchedReduceSize () < numShards ) {
690705 // only use this if there are aggs and if there are more shards than we should reduce at once
691- return new QueryPhaseResultConsumer (this , numShards , request .getBatchedReduceSize (), hasTopDocs , hasAggs ,
706+ return new QueryPhaseResultConsumer (listener , this , numShards , request .getBatchedReduceSize (), hasTopDocs , hasAggs ,
692707 trackTotalHitsUpTo , request .isFinalReduce ());
693708 }
694709 }
695710 return new ArraySearchPhaseResults <SearchPhaseResult >(numShards ) {
711+ @ Override
712+ void consumeResult (SearchPhaseResult result ) {
713+ super .consumeResult (result );
714+ listener .notifyQueryResult (result .queryResult ().getShardIndex ());
715+ }
716+
696717 @ Override
697718 ReducedQueryPhase reduce () {
698- return reducedQueryPhase (results .asList (), isScrollRequest , trackTotalHitsUpTo , request .isFinalReduce ());
719+ List <SearchPhaseResult > resultList = results .asList ();
720+ final ReducedQueryPhase reducePhase =
721+ reducedQueryPhase (resultList , isScrollRequest , trackTotalHitsUpTo , request .isFinalReduce ());
722+ listener .notifyReduce (listener .searchShards (resultList ), reducePhase .totalHits , reducePhase .aggregations );
723+ return reducePhase ;
699724 }
700725 };
701726 }
0 commit comments