Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineTransportAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]MultiSearchRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportMultiSearchAction.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false), "", 1, 1, 0, new ShardSearchFailure[0]));
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,26 +465,30 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
* @param queryResults a list of non-null query shard results
*/
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
return reducedQueryPhase(queryResults, null);
return reducedQueryPhase(queryResults, null, 0);
}

/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
* @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
* from all non-null query results.
* @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
List<InternalAggregations> bufferdAggs) {
List<InternalAggregations> bufferdAggs, int numReducePhases) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
long totalHits = 0;
long fetchHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
numReducePhases);
}
final QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
Expand All @@ -493,6 +497,7 @@ private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? e
final List<InternalAggregations> aggregationsList;
if (bufferdAggs != null) {
consumeAggs = false;
assert numReducePhases > 1 : "num reduce phases must be > 1 but was: " + numReducePhases;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
aggregationsList = bufferdAggs;
Expand Down Expand Up @@ -548,7 +553,7 @@ private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? e
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
shardResults);
shardResults, numReducePhases);
}


Expand Down Expand Up @@ -597,10 +602,15 @@ public static final class ReducedQueryPhase {
final InternalAggregations aggregations;
// the reduced profile results
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;

ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
SearchProfileShardResults shardResults) {
SearchProfileShardResults shardResults, int numReducePhases) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
Expand All @@ -614,14 +624,15 @@ public static final class ReducedQueryPhase {
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
}

/**
* Creates a new search response from the given merged hits.
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}

/**
Expand All @@ -643,6 +654,7 @@ static final class QueryPhaseResultConsumer
private final InternalAggregations[] buffer;
private int index;
private final SearchPhaseController controller;
private int numReducePhases = 0;

/**
* Creates a new {@link QueryPhaseResultConsumer}
Expand Down Expand Up @@ -677,6 +689,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == buffer.length) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
Arrays.fill(buffer, null);
numReducePhases++;
buffer[0] = reducedAggs;
index = 1;
}
Expand All @@ -690,7 +703,7 @@ private synchronized List<InternalAggregations> getRemaining() {

@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemaining());
return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
}

/**
Expand All @@ -699,6 +712,8 @@ public ReducedQueryPhase reduce() {
int getNumBuffered() {
return index;
}

int getNumReducePhases() { return numReducePhases; }
}

/**
Expand All @@ -707,9 +722,9 @@ int getNumBuffered() {
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
if (source != null && source.aggregations() != null) {
if (request.getReduceUpTo() < numShards) {
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getReduceUpTo());
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private Scroll scroll;

private int reduceUpTo = 512;
private int batchedReduceSize = 512;

private String[] types = Strings.EMPTY_ARRAY;

Expand Down Expand Up @@ -281,19 +281,19 @@ public Boolean requestCache() {
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public void setReduceUpTo(int reduceUpTo) {
if (reduceUpTo <= 1) {
throw new IllegalArgumentException("reduceUpTo must be >= 2");
public void setBatchedReduceSize(int batchedReduceSize) {
if (batchedReduceSize <= 1) {
throw new IllegalArgumentException("batchedReduceSize must be >= 2");
}
this.reduceUpTo = reduceUpTo;
this.batchedReduceSize = batchedReduceSize;
}

/**
* Returns the number of shard results that should be reduced at once on the coordinating node. This value should be used as a
* protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public int getReduceUpTo() {
return reduceUpTo;
public int getBatchedReduceSize() {
return batchedReduceSize;
}

/**
Expand Down Expand Up @@ -343,7 +343,7 @@ public void readFrom(StreamInput in) throws IOException {
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
reduceUpTo = in.readVInt();
batchedReduceSize = in.readVInt();
}
}

Expand All @@ -363,7 +363,7 @@ public void writeTo(StreamOutput out) throws IOException {
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeVInt(reduceUpTo);
out.writeVInt(batchedReduceSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,12 @@ private SearchSourceBuilder sourceBuilder() {
return request.source();
}

public SearchRequestBuilder setReduceUpTo(int reduceUpTo) {
this.request.setReduceUpTo(reduceUpTo);
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) {
this.request.setBatchedReduceSize(batchedReduceSize);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
public SearchResponse() {
}

public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards,
long tookInMillis, ShardSearchFailure[] shardFailures) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.totalShards = totalShards;
Expand Down Expand Up @@ -106,6 +107,13 @@ public Boolean isTerminatedEarly() {
return internalResponse.terminatedEarly();
}

/**
* Returns the number of reduce phases applied to obtain this search response
*/
public int getNumReducePhases() {
return internalResponse.getNumReducePhases();
}

/**
* How long the search took.
*/
Expand Down Expand Up @@ -172,13 +180,6 @@ public void scrollId(String scrollId) {
return internalResponse.profile();
}

static final class Fields {
static final String _SCROLL_ID = "_scroll_id";
static final String TOOK = "took";
static final String TIMED_OUT = "timed_out";
static final String TERMINATED_EARLY = "terminated_early";
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -189,14 +190,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (scrollId != null) {
builder.field(Fields._SCROLL_ID, scrollId);
builder.field("_scroll_id", scrollId);
}
builder.field(Fields.TOOK, tookInMillis);
builder.field(Fields.TIMED_OUT, isTimedOut());
builder.field("took", tookInMillis);
builder.field("timed_out", isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(Fields.TERMINATED_EARLY, isTerminatedEarly());
builder.field("terminated_early", isTerminatedEarly());
}
if (getNumReducePhases() != 1) {
builder.field("num_reduce_phases", getNumReducePhases());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(), getShardFailures());
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(),
getShardFailures());
internalResponse.toXContent(builder, params);
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.rest.action.search;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -94,6 +93,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.source().parseXContent(context);
}

final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
searchRequest.setBatchedReduceSize(batchedReduceSize);

// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.internal;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -38,7 +39,7 @@
public class InternalSearchResponse implements Streamable, ToXContent {

public static InternalSearchResponse empty() {
return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null);
return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null, 1);
}

private SearchHits hits;
Expand All @@ -53,17 +54,21 @@ public static InternalSearchResponse empty() {

private Boolean terminatedEarly = null;

private int numReducePhases = 1;

private InternalSearchResponse() {
}

public InternalSearchResponse(SearchHits hits, InternalAggregations aggregations, Suggest suggest,
SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly) {
SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly,
int numReducePhases) {
this.hits = hits;
this.aggregations = aggregations;
this.suggest = suggest;
this.profileResults = profileResults;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
}

public boolean timedOut() {
Expand All @@ -86,6 +91,13 @@ public Suggest suggest() {
return suggest;
}

/**
* Returns the number of reduce phases applied to obtain this search response
*/
public int getNumReducePhases() {
return numReducePhases;
}

/**
* Returns the profile results for this search response (including all shards).
* An empty map is returned if profiling was not enabled
Expand Down Expand Up @@ -132,6 +144,11 @@ public void readFrom(StreamInput in) throws IOException {
timedOut = in.readBoolean();
terminatedEarly = in.readOptionalBoolean();
profileResults = in.readOptionalWriteable(SearchProfileShardResults::new);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
numReducePhases = in.readVInt();
} else {
numReducePhases = 1;
}
}

@Override
Expand All @@ -152,5 +169,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(timedOut);
out.writeOptionalBoolean(terminatedEarly);
out.writeOptionalWriteable(profileResults);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeVInt(numReducePhases);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
// Now we can simulate a response and check the delay that we used for the task
SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap());
SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0);
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false);
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);

if (randomBoolean()) {
Expand Down
Loading