Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
Expand All @@ -49,7 +50,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
listener.onResponse(new SearchResponse(new InternalSearchResponse(
new SearchHits(
new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
new InternalAggregations(Collections.emptyList()),
DelayableWriteable.referencing(new InternalAggregations(Collections.emptyList())),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1),
"", 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client;

import com.fasterxml.jackson.core.JsonParseException;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -238,7 +239,8 @@ public void testInfo() throws IOException {
}

public void testSearchScroll() throws IOException {
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections.Simple(
SearchHits.empty(), InternalAggregations.EMPTY,
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
mockResponse(mockSearchResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -114,8 +115,9 @@ private static MockTransportService startTransport(
});
newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new,
(request, channel, task) -> {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
InternalSearchResponse response = new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN),
DelayableWriteable.referencing(InternalAggregations.EMPTY), null, null, false, null, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
channel.sendResponse(searchResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ public static final class ReducedQueryPhase {
* @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
return new InternalSearchResponse(hits, DelayableWriteable.referencing(aggregations), suggest, shardResults,
timedOut, terminatedEarly, numReducePhases);
}
}

Expand Down Expand Up @@ -697,14 +698,14 @@ public void consumeResult(SearchPhaseResult result) {
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (querySearchResult.isNull() == false) {
if (index == bufferSize) {
InternalAggregations reducedAggs = null;
if (hasAggs) {
List<InternalAggregations> aggs = new ArrayList<>(aggsBuffer.length);
for (int i = 0; i < aggsBuffer.length; i++) {
aggs.add(aggsBuffer[i].get());
aggsBuffer[i] = null; // null the buffer so it can be GCed now.
}
reducedAggs = InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forPartialReduction());
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(
aggs, aggReduceContextBuilder.forPartialReduction());
aggsBuffer[0] = DelayableWriteable.referencing(reducedAggs)
.asSerialized(InternalAggregations::new, namedWriteableRegistry);
long previousBufferSize = aggsCurrentBufferSize;
Expand All @@ -724,7 +725,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
index = 1;
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards),
topDocsStats.getTotalHits(), reducedAggs, numReducePhases);
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
}
}
final int i = index++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -77,10 +78,11 @@ protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exc
*
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The partial result for aggregations.
* @param aggs The partial result for aggregations stored in serialized form.
* @param reducePhase The version number for this reduce.
*/
protected void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
protected void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, DelayableWriteable.Serialized<InternalAggregations> aggs,
int reducePhase) {}

/**
* Executed once when the final reduce is created.
Expand Down Expand Up @@ -134,7 +136,8 @@ final void notifyQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exc
}
}

final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, DelayableWriteable.Serialized<InternalAggregations> aggs,
int reducePhase) {
try {
onPartialReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb

public SearchResponse(StreamInput in) throws IOException {
super(in);
internalResponse = new InternalSearchResponse(in);
internalResponse = InternalSearchResponse.read(in);
totalShards = in.readVInt();
successfulShards = in.readVInt();
int size = in.readVInt();
Expand Down Expand Up @@ -358,7 +359,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
}
}
}
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly,
SearchResponseSections searchResponseSections = new SearchResponseSections.Simple(hits, aggs, suggest, timedOut, terminatedEarly,
profile, numReducePhases);
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY), clusters);
Expand Down Expand Up @@ -484,7 +485,7 @@ public String toString() {
static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 0);
DelayableWriteable.referencing(InternalAggregations.EMPTY), null, null, false, null, 0);
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, tookInMillisSupplier.get(),
ShardSearchFailure.EMPTY_ARRAY, clusters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.search.SearchPhaseController.TopDocsStats;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -132,6 +133,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
TopDocsStats topDocsStats = new TopDocsStats(trackTotalHitsUpTo);

for (SearchResponse searchResponse : searchResponses) {
InternalSearchResponse internal = (InternalSearchResponse) searchResponse.getInternalResponse();
totalShards += searchResponse.getTotalShards();
skippedShards += searchResponse.getSkippedShards();
successfulShards += searchResponse.getSuccessfulShards();
Expand All @@ -141,8 +143,8 @@ SearchResponse getMergedResponse(Clusters clusters) {

profileResults.putAll(searchResponse.getProfileResults());

if (searchResponse.getAggregations() != null) {
InternalAggregations internalAggs = (InternalAggregations) searchResponse.getAggregations();
InternalAggregations internalAggs = internal.consumeAggregations();
if (internalAggs != null) {
aggs.add(internalAggs);
}

Expand Down Expand Up @@ -200,8 +202,8 @@ SearchResponse getMergedResponse(Clusters clusters) {
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, profileShardResults,
topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases);
InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, DelayableWriteable.referencing(reducedAggs),
suggest, profileShardResults, topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases);
long tookInMillis = searchTimeProvider.buildTookInMillis();
return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,18 @@
* to parse aggregations into, which are not serializable. This is the common part that can be
* shared between core and client.
*/
public class SearchResponseSections implements ToXContentFragment {
public abstract class SearchResponseSections implements ToXContentFragment {

protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
protected final SearchProfileShardResults profileResults;
protected final boolean timedOut;
protected final Boolean terminatedEarly;
protected final int numReducePhases;

public SearchResponseSections(SearchHits hits, Aggregations aggregations, Suggest suggest, boolean timedOut, Boolean terminatedEarly,
SearchProfileShardResults profileResults, int numReducePhases) {
public SearchResponseSections(SearchHits hits, Suggest suggest, boolean timedOut,
Boolean terminatedEarly, SearchProfileShardResults profileResults, int numReducePhases) {
this.hits = hits;
this.aggregations = aggregations;
this.suggest = suggest;
this.profileResults = profileResults;
this.timedOut = timedOut;
Expand All @@ -73,9 +71,7 @@ public final SearchHits hits() {
return hits;
}

public final Aggregations aggregations() {
return aggregations;
}
public abstract Aggregations aggregations();

public final Suggest suggest() {
return suggest;
Expand Down Expand Up @@ -104,6 +100,7 @@ public final Map<String, ProfileShardResult> profile() {
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
hits.toXContent(builder, params);
Aggregations aggregations = aggregations();
if (aggregations != null) {
aggregations.toXContent(builder, params);
}
Expand All @@ -119,4 +116,24 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
protected void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}

/**
* A "simple" implementation of {@link SearchResponseSections} that
* doens't do any of the funny server side things like implement
* serialization.
*/
public static class Simple extends SearchResponseSections {
private final Aggregations aggregations;

public Simple(SearchHits hits, Aggregations aggregations, Suggest suggest, boolean timedOut,
Boolean terminatedEarly, SearchProfileShardResults profileResults, int numReducePhases) {
super(hits, suggest, timedOut, terminatedEarly, profileResults, numReducePhases);
this.aggregations = aggregations;
}

@Override
public Aggregations aggregations() {
return aggregations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -279,11 +280,14 @@ static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIn
remoteClusterClient.search(ccsSearchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
InternalSearchResponse internal = (InternalSearchResponse) searchResponse.getInternalResponse();
Map<String, ProfileShardResult> profileResults = searchResponse.getProfileResults();
SearchProfileShardResults profile = profileResults == null || profileResults.isEmpty()
? null : new SearchProfileShardResults(profileResults);
InternalAggregations aggs = internal.consumeAggregations();
DelayableWriteable<InternalAggregations> delayedAggs = aggs == null ? null : DelayableWriteable.referencing(aggs);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchResponse.getHits(),
(InternalAggregations) searchResponse.getAggregations(), searchResponse.getSuggest(), profile,
delayedAggs, searchResponse.getSuggest(), profile,
searchResponse.isTimedOut(), searchResponse.isTerminatedEarly(), searchResponse.getNumReducePhases());
listener.onResponse(new SearchResponse(internalSearchResponse, searchResponse.getScrollId(),
searchResponse.getTotalShards(), searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(),
Expand Down
Loading