diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 54f148894dabf..309fd865a22ec 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -157,7 +157,6 @@
-
diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java
index 1e09e890a0b67..77e7cdab93707 100644
--- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java
+++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java
@@ -53,6 +53,6 @@ protected void doExecute(SearchRequest request, ActionListener l
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
- new SearchProfileShardResults(Collections.emptyMap()), false, false), "", 1, 1, 0, new ShardSearchFailure[0]));
+ new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
index fe39e74142612..eb30df1b699c8 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
@@ -465,7 +465,7 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
* @param queryResults a list of non-null query shard results
*/
public final ReducedQueryPhase reducedQueryPhase(List extends AtomicArray.Entry extends QuerySearchResultProvider>> queryResults) {
- return reducedQueryPhase(queryResults, null);
+ return reducedQueryPhase(queryResults, null, 0);
}
/**
@@ -473,18 +473,22 @@ public final ReducedQueryPhase reducedQueryPhase(List extends AtomicArray.Entr
* @param queryResults a list of non-null query shard results
* @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
* from all non-null query results.
+ * @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(List extends AtomicArray.Entry extends QuerySearchResultProvider>> queryResults,
- List bufferdAggs) {
+ List bufferdAggs, int numReducePhases) {
+ assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
+ numReducePhases++; // increment for this phase
long totalHits = 0;
long fetchHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null);
+ return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
+ numReducePhases);
}
final QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
@@ -493,6 +497,7 @@ private ReducedQueryPhase reducedQueryPhase(List extends AtomicArray.Entry e
final List aggregationsList;
if (bufferdAggs != null) {
consumeAggs = false;
+ assert numReducePhases > 1 : "num reduce phases must be > 1 but was: " + numReducePhases;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
aggregationsList = bufferdAggs;
@@ -548,7 +553,7 @@ private ReducedQueryPhase reducedQueryPhase(List extends AtomicArray.Entry e
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
- shardResults);
+ shardResults, numReducePhases);
}
@@ -597,10 +602,15 @@ public static final class ReducedQueryPhase {
final InternalAggregations aggregations;
// the reduced profile results
final SearchProfileShardResults shardResults;
+ // the number of reduces phases
+ final int numReducePhases;
ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
- SearchProfileShardResults shardResults) {
+ SearchProfileShardResults shardResults, int numReducePhases) {
+ if (numReducePhases <= 0) {
+ throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
+ }
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
@@ -614,6 +624,7 @@ public static final class ReducedQueryPhase {
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
+ this.numReducePhases = numReducePhases;
}
/**
@@ -621,7 +632,7 @@ public static final class ReducedQueryPhase {
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
- return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
+ return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}
/**
@@ -643,6 +654,7 @@ static final class QueryPhaseResultConsumer
private final InternalAggregations[] buffer;
private int index;
private final SearchPhaseController controller;
+ private int numReducePhases = 0;
/**
* Creates a new {@link QueryPhaseResultConsumer}
@@ -677,6 +689,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == buffer.length) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
Arrays.fill(buffer, null);
+ numReducePhases++;
buffer[0] = reducedAggs;
index = 1;
}
@@ -690,7 +703,7 @@ private synchronized List getRemaining() {
@Override
public ReducedQueryPhase reduce() {
- return controller.reducedQueryPhase(results.asList(), getRemaining());
+ return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
}
/**
@@ -699,6 +712,8 @@ public ReducedQueryPhase reduce() {
int getNumBuffered() {
return index;
}
+
+ int getNumReducePhases() { return numReducePhases; }
}
/**
@@ -707,9 +722,9 @@ int getNumBuffered() {
InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
if (source != null && source.aggregations() != null) {
- if (request.getReduceUpTo() < numShards) {
+ if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
- return new QueryPhaseResultConsumer(this, numShards, request.getReduceUpTo());
+ return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
index 0c1189d1d694e..6e2701ad41720 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
@@ -71,7 +71,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Scroll scroll;
- private int reduceUpTo = 512;
+ private int batchedReduceSize = 512;
private String[] types = Strings.EMPTY_ARRAY;
@@ -281,19 +281,19 @@ public Boolean requestCache() {
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
- public void setReduceUpTo(int reduceUpTo) {
- if (reduceUpTo <= 1) {
- throw new IllegalArgumentException("reduceUpTo must be >= 2");
+ public void setBatchedReduceSize(int batchedReduceSize) {
+ if (batchedReduceSize <= 1) {
+ throw new IllegalArgumentException("batchedReduceSize must be >= 2");
}
- this.reduceUpTo = reduceUpTo;
+ this.batchedReduceSize = batchedReduceSize;
}
/**
* Returns the number of shard results that should be reduced at once on the coordinating node. This value should be used as a
* protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
- public int getReduceUpTo() {
- return reduceUpTo;
+ public int getBatchedReduceSize() {
+ return batchedReduceSize;
}
/**
@@ -343,7 +343,7 @@ public void readFrom(StreamInput in) throws IOException {
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
- reduceUpTo = in.readVInt();
+ batchedReduceSize = in.readVInt();
}
}
@@ -363,7 +363,7 @@ public void writeTo(StreamOutput out) throws IOException {
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
- out.writeVInt(reduceUpTo);
+ out.writeVInt(batchedReduceSize);
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
index c6c5e0fbf3d60..ffe2c1b20c516 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
@@ -524,8 +524,12 @@ private SearchSourceBuilder sourceBuilder() {
return request.source();
}
- public SearchRequestBuilder setReduceUpTo(int reduceUpTo) {
- this.request.setReduceUpTo(reduceUpTo);
+ /**
+ * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
+ * mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
+ */
+ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) {
+ this.request.setBatchedReduceSize(batchedReduceSize);
return this;
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
index 4b8ba5e64b695..54d8eab99e727 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
@@ -61,7 +61,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
public SearchResponse() {
}
- public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
+ public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards,
+ long tookInMillis, ShardSearchFailure[] shardFailures) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.totalShards = totalShards;
@@ -106,6 +107,13 @@ public Boolean isTerminatedEarly() {
return internalResponse.terminatedEarly();
}
+ /**
+ * Returns the number of reduce phases applied to obtain this search response
+ */
+ public int getNumReducePhases() {
+ return internalResponse.getNumReducePhases();
+ }
+
/**
* How long the search took.
*/
@@ -172,13 +180,6 @@ public void scrollId(String scrollId) {
return internalResponse.profile();
}
- static final class Fields {
- static final String _SCROLL_ID = "_scroll_id";
- static final String TOOK = "took";
- static final String TIMED_OUT = "timed_out";
- static final String TERMINATED_EARLY = "terminated_early";
- }
-
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@@ -189,14 +190,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (scrollId != null) {
- builder.field(Fields._SCROLL_ID, scrollId);
+ builder.field("_scroll_id", scrollId);
}
- builder.field(Fields.TOOK, tookInMillis);
- builder.field(Fields.TIMED_OUT, isTimedOut());
+ builder.field("took", tookInMillis);
+ builder.field("timed_out", isTimedOut());
if (isTerminatedEarly() != null) {
- builder.field(Fields.TERMINATED_EARLY, isTerminatedEarly());
+ builder.field("terminated_early", isTerminatedEarly());
+ }
+ if (getNumReducePhases() != 1) {
+ builder.field("num_reduce_phases", getNumReducePhases());
}
- RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(), getShardFailures());
+ RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(),
+ getShardFailures());
internalResponse.toXContent(builder, params);
return builder;
}
diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
index bf8308202b7f9..89e2f23861c98 100644
--- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
+++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
@@ -20,7 +20,6 @@
package org.elasticsearch.rest.action.search;
import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
@@ -94,6 +93,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.source().parseXContent(context);
}
+ final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
+ searchRequest.setBatchedReduceSize(batchedReduceSize);
+
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.
diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java
index d396d14e9837b..998b99a68dafb 100644
--- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java
+++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java
@@ -19,6 +19,7 @@
package org.elasticsearch.search.internal;
+import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@@ -38,7 +39,7 @@
public class InternalSearchResponse implements Streamable, ToXContent {
public static InternalSearchResponse empty() {
- return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null);
+ return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null, 1);
}
private SearchHits hits;
@@ -53,17 +54,21 @@ public static InternalSearchResponse empty() {
private Boolean terminatedEarly = null;
+ private int numReducePhases = 1;
+
private InternalSearchResponse() {
}
public InternalSearchResponse(SearchHits hits, InternalAggregations aggregations, Suggest suggest,
- SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly) {
+ SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly,
+ int numReducePhases) {
this.hits = hits;
this.aggregations = aggregations;
this.suggest = suggest;
this.profileResults = profileResults;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
+ this.numReducePhases = numReducePhases;
}
public boolean timedOut() {
@@ -86,6 +91,13 @@ public Suggest suggest() {
return suggest;
}
+ /**
+ * Returns the number of reduce phases applied to obtain this search response
+ */
+ public int getNumReducePhases() {
+ return numReducePhases;
+ }
+
/**
* Returns the profile results for this search response (including all shards).
* An empty map is returned if profiling was not enabled
@@ -132,6 +144,11 @@ public void readFrom(StreamInput in) throws IOException {
timedOut = in.readBoolean();
terminatedEarly = in.readOptionalBoolean();
profileResults = in.readOptionalWriteable(SearchProfileShardResults::new);
+ if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
+ numReducePhases = in.readVInt();
+ } else {
+ numReducePhases = 1;
+ }
}
@Override
@@ -152,5 +169,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(timedOut);
out.writeOptionalBoolean(terminatedEarly);
out.writeOptionalWriteable(profileResults);
+ if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
+ out.writeVInt(numReducePhases);
+ }
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java
index 92ce5040e376b..1fafd847a3f20 100644
--- a/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java
@@ -446,7 +446,7 @@ public ScheduledFuture> schedule(TimeValue delay, String name, Runnable comman
// Now we can simulate a response and check the delay that we used for the task
SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap());
SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0);
- InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false);
+ InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);
if (randomBoolean()) {
diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java
index 0ac452e62bdaa..20e295561bbcd 100644
--- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java
+++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java
@@ -79,7 +79,7 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
- null, null, null, false, null);
+ null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(response, null)
@@ -91,7 +91,7 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))},
1, 1.0F);
- InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
@@ -132,7 +132,7 @@ public void testFailOneItemFailsEntirePhase() throws IOException {
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) {
assertTrue(executedMultiSearch.compareAndSet(false, true));
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
- null, null, null, false, null);
+ null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
@@ -146,7 +146,7 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))}, 1,
1.0F);
- InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
@@ -180,7 +180,7 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null)))),
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null))))}, 1, 1.0F);
- InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
index 21d5b6aee9063..5270fd59ce993 100644
--- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
@@ -29,10 +29,8 @@
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
-import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
@@ -245,7 +243,7 @@ public void testConsumer() {
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
- request.setReduceUpTo(bufferSize);
+ request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3);
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
@@ -267,15 +265,18 @@ public void testConsumer() {
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
consumer.consumeResult(1, result);
-
+ int numTotalReducePhases = 1;
if (bufferSize == 2) {
assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
+ assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases());
assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered());
+ numTotalReducePhases++;
} else {
assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)));
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(numTotalReducePhases, reduce.numReducePhases);
InternalMax max = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(3.0D, max.getValue(), 0.0D);
}
@@ -286,7 +287,7 @@ public void testConsumerConcurrently() throws InterruptedException {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
- request.setReduceUpTo(bufferSize);
+ request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
@@ -322,7 +323,7 @@ public void testNewSearchPhaseResults() {
if ((hasAggs = randomBoolean())) {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
}
- request.setReduceUpTo(bufferSize);
+ request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
if (hasAggs && expectedNumResults > bufferSize) {
diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java
index a7173dc4c22b3..3f2163f25d927 100644
--- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java
+++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java
@@ -38,7 +38,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
@@ -88,7 +87,7 @@ public static Client client() {
*/
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
- return this.in.prepareSearch(indices).setReduceUpTo(512);
+ return this.in.prepareSearch(indices).setBatchedReduceSize(512);
}
};
}
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json
index 328383e16399f..dc5fda57439f5 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json
@@ -158,6 +158,11 @@
"request_cache": {
"type" : "boolean",
"description" : "Specify if request cache should be used for this request or not, defaults to index level setting"
+ },
+ "batched_reduce_size" : {
+ "type" : "number",
+ "description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.",
+ "default" : 512
}
}
},
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/120_batch_reduce_size.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/120_batch_reduce_size.yaml
new file mode 100644
index 0000000000000..4c2054c2964bd
--- /dev/null
+++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/120_batch_reduce_size.yaml
@@ -0,0 +1,70 @@
+setup:
+ - do:
+ indices.create:
+ index: test_1
+ body:
+ settings:
+ number_of_shards: 5
+ number_of_replicas: 0
+ mappings:
+ test:
+ properties:
+ str:
+ type: keyword
+
+---
+"batched_reduce_size lower limit":
+ - skip:
+ version: " - 5.99.99"
+ reason: this was added in 6.0.0
+ - do:
+ catch: /batchedReduceSize must be >= 2/
+ search:
+ index: test_1
+ batched_reduce_size: 1
+
+
+---
+"batched_reduce_size 2 with 5 shards":
+ - skip:
+ version: " - 5.99.99"
+ reason: this was added in 6.0.0
+ - do:
+ index:
+ index: test_1
+ type: test
+ id: 1
+ body: { "str" : "abc" }
+
+ - do:
+ index:
+ index: test_1
+ type: test
+ id: 2
+ body: { "str": "abc" }
+
+ - do:
+ index:
+ index: test_1
+ type: test
+ id: 3
+ body: { "str": "bcd" }
+ - do:
+ indices.refresh: {}
+
+ - do:
+ search:
+ batched_reduce_size: 2
+ body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }
+
+ - match: { num_reduce_phases: 4 }
+ - match: { hits.total: 3 }
+ - length: { aggregations.str_terms.buckets: 2 }
+ - match: { aggregations.str_terms.buckets.0.key: "abc" }
+ - is_false: aggregations.str_terms.buckets.0.key_as_string
+ - match: { aggregations.str_terms.buckets.0.doc_count: 2 }
+ - match: { aggregations.str_terms.buckets.1.key: "bcd" }
+ - is_false: aggregations.str_terms.buckets.1.key_as_string
+ - match: { aggregations.str_terms.buckets.1.doc_count: 1 }
+
+
diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
index 77fb115c5f717..a7d9a72e6b77c 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
@@ -36,7 +36,7 @@ public class RandomizingClient extends FilterClient {
private final SearchType defaultSearchType;
private final String defaultPreference;
- private final int reduceUpTo;
+ private final int batchedReduceSize;
public RandomizingClient(Client client, Random random) {
@@ -54,13 +54,14 @@ public RandomizingClient(Client client, Random random) {
} else {
defaultPreference = null;
}
- this.reduceUpTo = 2 + random.nextInt(10);
+ this.batchedReduceSize = 2 + random.nextInt(10);
}
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
- return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference).setReduceUpTo(reduceUpTo);
+ return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference)
+ .setBatchedReduceSize(batchedReduceSize);
}
@Override