Skip to content

Commit 0a69c69

Browse files
committed
Expose batched_reduce_size via _search (#23288)
In #23253 we added an the ability to incrementally reduce search results. This change exposes the parameter to control the batch since and therefore the memory consumption of a large search request.
1 parent 1627586 commit 0a69c69

File tree

15 files changed

+176
-55
lines changed

15 files changed

+176
-55
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@
161161
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineRequestBuilder.java" checks="LineLength" />
162162
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineTransportAction.java" checks="LineLength" />
163163
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]MultiSearchRequestBuilder.java" checks="LineLength" />
164-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchResponse.java" checks="LineLength" />
165164
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
166165
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
167166
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportMultiSearchAction.java" checks="LineLength" />

client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
5353
new SearchHit[0], 0L, 0.0f),
5454
new InternalAggregations(Collections.emptyList()),
5555
new Suggest(Collections.emptyList()),
56-
new SearchProfileShardResults(Collections.emptyMap()), false, false), "", 1, 1, 0, new ShardSearchFailure[0]));
56+
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
5757
}
5858
}

core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -468,26 +468,30 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
468468
* @param queryResults a list of non-null query shard results
469469
*/
470470
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
471-
return reducedQueryPhase(queryResults, null);
471+
return reducedQueryPhase(queryResults, null, 0);
472472
}
473473

474474
/**
475475
* Reduces the given query results and consumes all aggregations and profile results.
476476
* @param queryResults a list of non-null query shard results
477477
* @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
478478
* from all non-null query results.
479+
* @param numReducePhases the number of non-final reduce phases applied to the query results.
479480
* @see QuerySearchResult#consumeAggs()
480481
* @see QuerySearchResult#consumeProfileResult()
481482
*/
482483
private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
483-
List<InternalAggregations> bufferdAggs) {
484+
List<InternalAggregations> bufferdAggs, int numReducePhases) {
485+
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
486+
numReducePhases++; // increment for this phase
484487
long totalHits = 0;
485488
long fetchHits = 0;
486489
float maxScore = Float.NEGATIVE_INFINITY;
487490
boolean timedOut = false;
488491
Boolean terminatedEarly = null;
489492
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
490-
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null);
493+
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
494+
numReducePhases);
491495
}
492496
final QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
493497
final boolean hasSuggest = firstResult.suggest() != null;
@@ -496,6 +500,7 @@ private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? e
496500
final List<InternalAggregations> aggregationsList;
497501
if (bufferdAggs != null) {
498502
consumeAggs = false;
503+
assert numReducePhases > 1 : "num reduce phases must be > 1 but was: " + numReducePhases;
499504
// we already have results from intermediate reduces and just need to perform the final reduce
500505
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
501506
aggregationsList = bufferdAggs;
@@ -551,7 +556,7 @@ private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? e
551556
firstResult.pipelineAggregators(), reduceContext);
552557
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
553558
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
554-
shardResults);
559+
shardResults, numReducePhases);
555560
}
556561

557562

@@ -600,10 +605,15 @@ public static final class ReducedQueryPhase {
600605
final InternalAggregations aggregations;
601606
// the reduced profile results
602607
final SearchProfileShardResults shardResults;
608+
// the number of reduces phases
609+
final int numReducePhases;
603610

604611
ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
605612
QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
606-
SearchProfileShardResults shardResults) {
613+
SearchProfileShardResults shardResults, int numReducePhases) {
614+
if (numReducePhases <= 0) {
615+
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
616+
}
607617
this.totalHits = totalHits;
608618
this.fetchHits = fetchHits;
609619
if (Float.isInfinite(maxScore)) {
@@ -617,14 +627,15 @@ public static final class ReducedQueryPhase {
617627
this.suggest = suggest;
618628
this.aggregations = aggregations;
619629
this.shardResults = shardResults;
630+
this.numReducePhases = numReducePhases;
620631
}
621632

622633
/**
623634
* Creates a new search response from the given merged hits.
624635
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray)
625636
*/
626637
public InternalSearchResponse buildResponse(SearchHits hits) {
627-
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
638+
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
628639
}
629640

630641
/**
@@ -646,6 +657,7 @@ static final class QueryPhaseResultConsumer
646657
private final InternalAggregations[] buffer;
647658
private int index;
648659
private final SearchPhaseController controller;
660+
private int numReducePhases = 0;
649661

650662
/**
651663
* Creates a new {@link QueryPhaseResultConsumer}
@@ -680,6 +692,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
680692
if (index == buffer.length) {
681693
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
682694
Arrays.fill(buffer, null);
695+
numReducePhases++;
683696
buffer[0] = reducedAggs;
684697
index = 1;
685698
}
@@ -693,7 +706,7 @@ private synchronized List<InternalAggregations> getRemaining() {
693706

694707
@Override
695708
public ReducedQueryPhase reduce() {
696-
return controller.reducedQueryPhase(results.asList(), getRemaining());
709+
return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
697710
}
698711

699712
/**
@@ -702,6 +715,8 @@ public ReducedQueryPhase reduce() {
702715
int getNumBuffered() {
703716
return index;
704717
}
718+
719+
int getNumReducePhases() { return numReducePhases; }
705720
}
706721

707722
/**
@@ -710,9 +725,9 @@ int getNumBuffered() {
710725
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> newSearchPhaseResults(SearchRequest request, int numShards) {
711726
SearchSourceBuilder source = request.source();
712727
if (source != null && source.aggregations() != null) {
713-
if (request.getReduceUpTo() < numShards) {
728+
if (request.getBatchedReduceSize() < numShards) {
714729
// only use this if there are aggs and if there are more shards than we should reduce at once
715-
return new QueryPhaseResultConsumer(this, numShards, request.getReduceUpTo());
730+
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
716731
}
717732
}
718733
return new InitialSearchPhase.SearchPhaseResults(numShards) {

core/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
7171

7272
private Scroll scroll;
7373

74-
private int reduceUpTo = 512;
74+
private int batchedReduceSize = 512;
7575

7676
private String[] types = Strings.EMPTY_ARRAY;
7777

@@ -281,19 +281,19 @@ public Boolean requestCache() {
281281
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
282282
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
283283
*/
284-
public void setReduceUpTo(int reduceUpTo) {
285-
if (reduceUpTo <= 1) {
286-
throw new IllegalArgumentException("reduceUpTo must be >= 2");
284+
public void setBatchedReduceSize(int batchedReduceSize) {
285+
if (batchedReduceSize <= 1) {
286+
throw new IllegalArgumentException("batchedReduceSize must be >= 2");
287287
}
288-
this.reduceUpTo = reduceUpTo;
288+
this.batchedReduceSize = batchedReduceSize;
289289
}
290290

291291
/**
292292
* Returns the number of shard results that should be reduced at once on the coordinating node. This value should be used as a
293293
* protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
294294
*/
295-
public int getReduceUpTo() {
296-
return reduceUpTo;
295+
public int getBatchedReduceSize() {
296+
return batchedReduceSize;
297297
}
298298

299299
/**
@@ -343,7 +343,7 @@ public void readFrom(StreamInput in) throws IOException {
343343
indicesOptions = IndicesOptions.readIndicesOptions(in);
344344
requestCache = in.readOptionalBoolean();
345345
if (in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) {
346-
reduceUpTo = in.readVInt();
346+
batchedReduceSize = in.readVInt();
347347
}
348348
}
349349

@@ -363,7 +363,7 @@ public void writeTo(StreamOutput out) throws IOException {
363363
indicesOptions.writeIndicesOptions(out);
364364
out.writeOptionalBoolean(requestCache);
365365
if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) {
366-
out.writeVInt(reduceUpTo);
366+
out.writeVInt(batchedReduceSize);
367367
}
368368
}
369369

core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,8 +524,12 @@ private SearchSourceBuilder sourceBuilder() {
524524
return request.source();
525525
}
526526

527-
public SearchRequestBuilder setReduceUpTo(int reduceUpTo) {
528-
this.request.setReduceUpTo(reduceUpTo);
527+
/**
528+
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
529+
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
530+
*/
531+
public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) {
532+
this.request.setBatchedReduceSize(batchedReduceSize);
529533
return this;
530534
}
531535
}

core/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
6161
public SearchResponse() {
6262
}
6363

64-
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
64+
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards,
65+
long tookInMillis, ShardSearchFailure[] shardFailures) {
6566
this.internalResponse = internalResponse;
6667
this.scrollId = scrollId;
6768
this.totalShards = totalShards;
@@ -106,6 +107,13 @@ public Boolean isTerminatedEarly() {
106107
return internalResponse.terminatedEarly();
107108
}
108109

110+
/**
111+
* Returns the number of reduce phases applied to obtain this search response
112+
*/
113+
public int getNumReducePhases() {
114+
return internalResponse.getNumReducePhases();
115+
}
116+
109117
/**
110118
* How long the search took.
111119
*/
@@ -172,13 +180,6 @@ public void scrollId(String scrollId) {
172180
return internalResponse.profile();
173181
}
174182

175-
static final class Fields {
176-
static final String _SCROLL_ID = "_scroll_id";
177-
static final String TOOK = "took";
178-
static final String TIMED_OUT = "timed_out";
179-
static final String TERMINATED_EARLY = "terminated_early";
180-
}
181-
182183
@Override
183184
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
184185
builder.startObject();
@@ -189,14 +190,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
189190

190191
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
191192
if (scrollId != null) {
192-
builder.field(Fields._SCROLL_ID, scrollId);
193+
builder.field("_scroll_id", scrollId);
193194
}
194-
builder.field(Fields.TOOK, tookInMillis);
195-
builder.field(Fields.TIMED_OUT, isTimedOut());
195+
builder.field("took", tookInMillis);
196+
builder.field("timed_out", isTimedOut());
196197
if (isTerminatedEarly() != null) {
197-
builder.field(Fields.TERMINATED_EARLY, isTerminatedEarly());
198+
builder.field("terminated_early", isTerminatedEarly());
199+
}
200+
if (getNumReducePhases() != 1) {
201+
builder.field("num_reduce_phases", getNumReducePhases());
198202
}
199-
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(), getShardFailures());
203+
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(),
204+
getShardFailures());
200205
internalResponse.toXContent(builder, params);
201206
return builder;
202207
}

core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.rest.action.search;
2121

2222
import org.elasticsearch.action.search.SearchRequest;
23-
import org.elasticsearch.action.search.SearchType;
2423
import org.elasticsearch.action.support.IndicesOptions;
2524
import org.elasticsearch.client.node.NodeClient;
2625
import org.elasticsearch.common.Strings;
@@ -97,6 +96,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
9796
searchRequest.source().parseXContent(context);
9897
}
9998

99+
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
100+
searchRequest.setBatchedReduceSize(batchedReduceSize);
101+
100102
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
101103
// from the REST layer. these modes are an internal optimization and should
102104
// not be specified explicitly by the user.

core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.search.internal;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.io.stream.Streamable;
@@ -43,7 +44,7 @@
4344
public class InternalSearchResponse implements Streamable, ToXContent {
4445

4546
public static InternalSearchResponse empty() {
46-
return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null);
47+
return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null, 1);
4748
}
4849

4950
private SearchHits hits;
@@ -58,17 +59,21 @@ public static InternalSearchResponse empty() {
5859

5960
private Boolean terminatedEarly = null;
6061

62+
private int numReducePhases = 1;
63+
6164
private InternalSearchResponse() {
6265
}
6366

6467
public InternalSearchResponse(SearchHits hits, InternalAggregations aggregations, Suggest suggest,
65-
SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly) {
68+
SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly,
69+
int numReducePhases) {
6670
this.hits = hits;
6771
this.aggregations = aggregations;
6872
this.suggest = suggest;
6973
this.profileResults = profileResults;
7074
this.timedOut = timedOut;
7175
this.terminatedEarly = terminatedEarly;
76+
this.numReducePhases = numReducePhases;
7277
}
7378

7479
public boolean timedOut() {
@@ -91,6 +96,13 @@ public Suggest suggest() {
9196
return suggest;
9297
}
9398

99+
/**
100+
* Returns the number of reduce phases applied to obtain this search response
101+
*/
102+
public int getNumReducePhases() {
103+
return numReducePhases;
104+
}
105+
94106
/**
95107
* Returns the profile results for this search response (including all shards).
96108
* An empty map is returned if profiling was not enabled
@@ -137,6 +149,11 @@ public void readFrom(StreamInput in) throws IOException {
137149
timedOut = in.readBoolean();
138150
terminatedEarly = in.readOptionalBoolean();
139151
profileResults = in.readOptionalWriteable(SearchProfileShardResults::new);
152+
if (in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) {
153+
numReducePhases = in.readVInt();
154+
} else {
155+
numReducePhases = 1;
156+
}
140157
}
141158

142159
@Override
@@ -157,5 +174,8 @@ public void writeTo(StreamOutput out) throws IOException {
157174
out.writeBoolean(timedOut);
158175
out.writeOptionalBoolean(terminatedEarly);
159176
out.writeOptionalWriteable(profileResults);
177+
if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) {
178+
out.writeVInt(numReducePhases);
179+
}
160180
}
161181
}

core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
445445
// Now we can simulate a response and check the delay that we used for the task
446446
SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap());
447447
SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0);
448-
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false);
448+
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1);
449449
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);
450450

451451
if (randomBoolean()) {

0 commit comments

Comments
 (0)