From 5ed2c461d292062f116a37ed434b57546ec26466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 5 Jan 2017 15:29:47 +0100 Subject: [PATCH 1/3] Add parsing from xContent to SearchResponse In preparation to be able to parse SearchResponse from its rest representation for the java rest client, this adds fromXContent to SearchResponse. Most of the information in the original object is preserved when parsing it back. However, the exceptions in the "failure" section won't be identical to the original ones on the server side since they are parsed back to a generic ElasticsearchException on the receiving side. Also the "aggregations", "suggest" and "profile" section parsing is currently skipped and will be added by subsequent PRs. --- .../search/TransportNoopSearchAction.java | 2 +- .../search/AbstractSearchAsyncAction.java | 4 +- .../SearchDfsQueryAndFetchAsyncAction.java | 2 +- .../SearchDfsQueryThenFetchAsyncAction.java | 3 +- .../action/search/SearchPhaseController.java | 3 +- .../SearchQueryAndFetchAsyncAction.java | 2 +- .../SearchQueryThenFetchAsyncAction.java | 3 +- .../action/search/SearchResponse.java | 191 +++++++++++++++++- .../SearchScrollQueryAndFetchAsyncAction.java | 3 +- ...SearchScrollQueryThenFetchAsyncAction.java | 4 +- .../action/search/ShardSearchFailure.java | 60 +++++- .../rest/action/RestActions.java | 18 +- .../aggregations/InternalAggregations.java | 2 +- .../search/internal/InternalSearchHits.java | 8 +- .../internal/InternalSearchResponse.java | 159 --------------- .../profile/SearchProfileShardResults.java | 3 +- .../elasticsearch/search/suggest/Suggest.java | 2 +- .../snapshots/SnapshotsService.java | 1 + .../search/SearchPhaseControllerTests.java | 10 +- .../action/search/SearchResponseTests.java | 120 +++++++++++ .../search/ShardSearchFailureTests.java | 93 +++++++++ .../reindex/AsyncBulkByScrollActionTests.java | 2 +- .../hamcrest/ElasticsearchAssertions.java | 21 +- 23 files changed, 506 insertions(+), 210 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java index c4397684bc41e..e2b2c2b06ab9b 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -30,7 +31,6 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.threadpool.ThreadPool; diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 2479ff86750b9..d5faecb6a6c97 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -20,12 +20,14 @@ package org.elasticsearch.action.search; import com.carrotsearch.hppc.IntArrayList; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -37,7 +39,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; @@ -96,6 +97,7 @@ protected AbstractSearchAsyncAction(Logger logger, SearchTransportService search this.concreteIndexBoosts = concreteIndexBoosts; } + @Override public void start() { if (expectedSuccessfulOps == 0) { //no search shards to search on, bail with empty response diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java index 9db3a21c48549..60e0fe64c2d3e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -31,7 +32,6 @@ import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 3fe24cc991139..b6072751054d6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -20,12 +20,14 @@ package org.elasticsearch.action.search; import com.carrotsearch.hppc.IntArrayList; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -35,7 +37,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 92270c6fe3680..5b1a213796dca 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -21,6 +21,7 @@ import com.carrotsearch.hppc.IntArrayList; import com.carrotsearch.hppc.ObjectObjectHashMap; + import org.apache.lucene.index.Term; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.FieldDoc; @@ -30,6 +31,7 @@ import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.lucene.Lucene; @@ -47,7 +49,6 @@ import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.query.QuerySearchResult; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index f597ede64bc32..09f0838fde04c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -22,11 +22,11 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import java.io.IOException; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 7b30006329193..4ada6115b203a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -20,12 +20,14 @@ package org.elasticsearch.action.search; import com.carrotsearch.hppc.IntArrayList; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -33,7 +35,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 4b8ba5e64b695..b43b45de6a4a3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -27,19 +27,27 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.profile.ProfileShardResult; +import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.suggest.Suggest; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; -import static org.elasticsearch.search.internal.InternalSearchResponse.readInternalSearchResponse; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField; +import static org.elasticsearch.search.internal.InternalSearchHits.readSearchHits; /** * A response of a search request. @@ -79,23 +87,23 @@ public RestStatus status() { * The search hits. */ public SearchHits getHits() { - return internalResponse.hits(); + return internalResponse.hits; } public Aggregations getAggregations() { - return internalResponse.aggregations(); + return internalResponse.aggregations; } public Suggest getSuggest() { - return internalResponse.suggest(); + return internalResponse.suggest; } /** * Has the search operation timed out. */ public boolean isTimedOut() { - return internalResponse.timedOut(); + return internalResponse.timedOut; } /** @@ -103,7 +111,7 @@ public boolean isTimedOut() { * terminateAfter */ public Boolean isTerminatedEarly() { - return internalResponse.terminatedEarly(); + return internalResponse.terminatedEarly; } /** @@ -197,14 +205,96 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t builder.field(Fields.TERMINATED_EARLY, isTerminatedEarly()); } RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(), getShardFailures()); - internalResponse.toXContent(builder, params); + internalResponse.hits.toXContent(builder, params); + if (internalResponse.aggregations != null) { + internalResponse.aggregations.toXContent(builder, params); + } + if (internalResponse.suggest != null) { + internalResponse.suggest.toXContent(builder, params); + } + if (internalResponse.profileResults != null) { + internalResponse.profileResults.toXContent(builder, params); + } return builder; } + public static SearchResponse fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + String currentFieldName = null; + InternalSearchHits hits = null; + boolean timedOut = false; + Boolean terminatedEarly = null; + int tookInMillis = 0; + int successfulShards = 0; + int totalShards = 0; + String scrollId = null; + List failures = new ArrayList<>(); + while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (Fields._SCROLL_ID.equals(currentFieldName)) { + scrollId = parser.text(); + } else if (Fields.TOOK.equals(currentFieldName)) { + tookInMillis = parser.intValue(); + } else if (Fields.TIMED_OUT.equals(currentFieldName)) { + timedOut = parser.booleanValue(); + } else if (Fields.TERMINATED_EARLY.equals(currentFieldName)) { + terminatedEarly = parser.booleanValue(); + } + else { + throwUnknownField(currentFieldName, parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (InternalSearchHits.Fields.HITS.equals(currentFieldName)) { + hits = InternalSearchHits.fromXContent(parser); + } else if (InternalAggregations.Fields.AGGREGATIONS.equals(currentFieldName)) { + // TODO parse aggregation result correctly + parser.skipChildren(); + } else if (Suggest.NAME.equals(currentFieldName)) { + // TODO parse suggest section + parser.skipChildren(); + } else if (SearchProfileShardResults.PROFILE_NAME.equals(currentFieldName)) { + // TODO parse "profile" section + parser.skipChildren(); + } else if (RestActions.ShardsFields._SHARDS.equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (RestActions.ShardsFields.FAILED.equals(currentFieldName)) { + parser.intValue(); // we don't need it but need to consume it + } else if (RestActions.ShardsFields.SUCCESSFUL.equals(currentFieldName)) { + successfulShards = parser.intValue(); + } else if (RestActions.ShardsFields.TOTAL.equals(currentFieldName)) { + totalShards = parser.intValue(); + } else { + throwUnknownField(currentFieldName, parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (RestActions.ShardsFields.FAILURES.equals(currentFieldName)) { + while((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + failures.add(ShardSearchFailure.fromXContent(parser)); + } + } else { + throwUnknownField(currentFieldName, parser.getTokenLocation()); + } + } + } + } else { + throwUnknownField(currentFieldName, parser.getTokenLocation()); + } + } + } + return new SearchResponse(new InternalSearchResponse(hits, null, null, null, timedOut, terminatedEarly), + scrollId, totalShards, successfulShards, tookInMillis, failures.toArray(new ShardSearchFailure[0])); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - internalResponse = readInternalSearchResponse(in); + internalResponse = InternalSearchResponse.readInternalSearchResponse(in); totalShards = in.readVInt(); successfulShards = in.readVInt(); int size = in.readVInt(); @@ -240,4 +330,87 @@ public void writeTo(StreamOutput out) throws IOException { public String toString() { return Strings.toString(this); } + + public static class InternalSearchResponse { + + public static InternalSearchResponse empty() { + return new InternalSearchResponse(InternalSearchHits.empty(), null, null, null, false, null); + } + + InternalSearchHits hits; + + private InternalAggregations aggregations; + + private Suggest suggest; + + private SearchProfileShardResults profileResults; + + private boolean timedOut; + + private Boolean terminatedEarly = null; + + private InternalSearchResponse() { + } + + public InternalSearchResponse(InternalSearchHits hits, InternalAggregations aggregations, Suggest suggest, + SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly) { + this.hits = hits; + this.aggregations = aggregations; + this.suggest = suggest; + this.profileResults = profileResults; + this.timedOut = timedOut; + this.terminatedEarly = terminatedEarly; + } + + /** + * Returns the profile results for this search response (including all shards). + * An empty map is returned if profiling was not enabled + * + * @return Profile results + */ + public Map profile() { + if (profileResults == null) { + return Collections.emptyMap(); + } + return profileResults.getShardResults(); + } + + public static InternalSearchResponse readInternalSearchResponse(StreamInput in) throws IOException { + InternalSearchResponse response = new InternalSearchResponse(); + response.readFrom(in); + return response; + } + + public void readFrom(StreamInput in) throws IOException { + hits = readSearchHits(in); + if (in.readBoolean()) { + aggregations = InternalAggregations.readAggregations(in); + } + if (in.readBoolean()) { + suggest = Suggest.readSuggest(in); + } + timedOut = in.readBoolean(); + terminatedEarly = in.readOptionalBoolean(); + profileResults = in.readOptionalWriteable(SearchProfileShardResults::new); + } + + public void writeTo(StreamOutput out) throws IOException { + hits.writeTo(out); + if (aggregations == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + aggregations.writeTo(out); + } + if (suggest == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + suggest.writeTo(out); + } + out.writeBoolean(timedOut); + out.writeOptionalBoolean(terminatedEarly); + out.writeOptionalWriteable(profileResults); + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index bf53fc719c6c3..b54ea8e7a15d5 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -31,7 +32,6 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -91,6 +91,7 @@ protected final void addShardFailure(final int shardIndex, ShardSearchFailure fa shardFailures.set(shardIndex, failure); } + @Override public void start() { if (scrollId.getContext().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 851e3343bc2ed..0ff2423b74829 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -20,11 +20,13 @@ package org.elasticsearch.action.search; import com.carrotsearch.hppc.IntArrayList; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse.InternalSearchResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -32,7 +34,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult; @@ -94,6 +95,7 @@ protected final void addShardFailure(final int shardIndex, ShardSearchFailure fa shardFailures.set(shardIndex, failure); } + @Override public void start() { if (scrollId.getContext().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); diff --git a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java index 8070081dcd865..f6e1149f2dc3c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java +++ b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java @@ -22,21 +22,34 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchException; import org.elasticsearch.search.SearchShardTarget; import java.io.IOException; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField; + /** * Represents a failure to search on a specific shard. */ public class ShardSearchFailure implements ShardOperationFailedException { + private static final String REASON_FIELD = "reason"; + private static final String NODE_FIELD = "node"; + private static final String INDEX_FIELD = "index"; + private static final String SHARD_FIELD = "shard"; + public static final ShardSearchFailure[] EMPTY_ARRAY = new ShardSearchFailure[0]; private SearchShardTarget shardTarget; @@ -68,7 +81,7 @@ public ShardSearchFailure(String reason, SearchShardTarget shardTarget) { this(reason, shardTarget, RestStatus.INTERNAL_SERVER_ERROR); } - public ShardSearchFailure(String reason, SearchShardTarget shardTarget, RestStatus status) { + private ShardSearchFailure(String reason, SearchShardTarget shardTarget, RestStatus status) { this.shardTarget = shardTarget; this.reason = reason; this.status = status; @@ -153,13 +166,13 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("shard", shardId()); - builder.field("index", index()); + builder.field(SHARD_FIELD, shardId()); + builder.field(INDEX_FIELD, index()); if (shardTarget != null) { - builder.field("node", shardTarget.nodeId()); + builder.field(NODE_FIELD, shardTarget.nodeId()); } if (cause != null) { - builder.field("reason"); + builder.field(REASON_FIELD); builder.startObject(); ElasticsearchException.toXContent(builder, params, cause); builder.endObject(); @@ -167,8 +180,45 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static ShardSearchFailure fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + String currentFieldName = null; + int shardId = -1; + String indexName = null; + String nodeId = null; + ElasticsearchException exception = null; + while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (SHARD_FIELD.equals(currentFieldName)) { + shardId = parser.intValue(); + } else if (INDEX_FIELD.equals(currentFieldName)) { + indexName = parser.text(); + } else if (NODE_FIELD.equals(currentFieldName)) { + nodeId = parser.text(); + } else { + throwUnknownField(currentFieldName, parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (REASON_FIELD.equals(currentFieldName)) { + exception = ElasticsearchException.fromXContent(parser); + } else { + throwUnknownField(currentFieldName, parser.getTokenLocation()); + } + } else { + throw new ParsingException(parser.getTokenLocation(), + "Failed to parse object: unsupported token found [" + token + "]"); + } + } + return new ShardSearchFailure(exception, + new SearchShardTarget(nodeId, new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId))); + } + @Override public Throwable getCause() { return cause; } + } diff --git a/core/src/main/java/org/elasticsearch/rest/action/RestActions.java b/core/src/main/java/org/elasticsearch/rest/action/RestActions.java index 4a96becbbbdce..bc3b376214ce4 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/RestActions.java +++ b/core/src/main/java/org/elasticsearch/rest/action/RestActions.java @@ -69,15 +69,23 @@ public static void buildBroadcastShardsHeader(XContentBuilder builder, Params pa response.getShardFailures()); } + public static final class ShardsFields { + public static final String _SHARDS = "_shards"; + public static final String TOTAL = "total"; + public static final String SUCCESSFUL = "successful"; + public static final String FAILED = "failed"; + public static final String FAILURES = "failures"; + } + public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params, int total, int successful, int failed, ShardOperationFailedException[] shardFailures) throws IOException { - builder.startObject("_shards"); - builder.field("total", total); - builder.field("successful", successful); - builder.field("failed", failed); + builder.startObject(ShardsFields._SHARDS); + builder.field(ShardsFields.TOTAL, total); + builder.field(ShardsFields.SUCCESSFUL, successful); + builder.field(ShardsFields.FAILED, failed); if (shardFailures != null && shardFailures.length > 0) { - builder.startArray("failures"); + builder.startArray(ShardsFields.FAILURES); final boolean group = params.paramAsBoolean("group_shard_failures", true); // we group by default for (ShardOperationFailedException shardFailure : group ? ExceptionsHelper.groupBy(shardFailures) : shardFailures) { builder.startObject(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 66e45156caf06..55c5343d0f50d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -161,7 +161,7 @@ public static InternalAggregations reduce(List aggregation } /** The fields required to write this addAggregation to xcontent */ - static class Fields { + public static class Fields { public static final String AGGREGATIONS = "aggregations"; } diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java index f4010ee1927ea..2968ff4f211d2 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java @@ -112,10 +112,10 @@ public InternalSearchHit[] internalHits() { return this.hits; } - static final class Fields { - static final String HITS = "hits"; - static final String TOTAL = "total"; - static final String MAX_SCORE = "max_score"; + public static final class Fields { + public static final String HITS = "hits"; + public static final String TOTAL = "total"; + public static final String MAX_SCORE = "max_score"; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java deleted file mode 100644 index 871d176ffcd48..0000000000000 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.internal; - -import org.elasticsearch.Version; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.profile.ProfileShardResult; -import org.elasticsearch.search.profile.SearchProfileShardResults; -import org.elasticsearch.search.suggest.Suggest; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; - -import static org.elasticsearch.search.internal.InternalSearchHits.readSearchHits; - -public class InternalSearchResponse implements Streamable, ToXContent { - - public static InternalSearchResponse empty() { - return new InternalSearchResponse(InternalSearchHits.empty(), null, null, null, false, null); - } - - private InternalSearchHits hits; - - private InternalAggregations aggregations; - - private Suggest suggest; - - private SearchProfileShardResults profileResults; - - private boolean timedOut; - - private Boolean terminatedEarly = null; - - private InternalSearchResponse() { - } - - public InternalSearchResponse(InternalSearchHits hits, InternalAggregations aggregations, Suggest suggest, - SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly) { - this.hits = hits; - this.aggregations = aggregations; - this.suggest = suggest; - this.profileResults = profileResults; - this.timedOut = timedOut; - this.terminatedEarly = terminatedEarly; - } - - public boolean timedOut() { - return this.timedOut; - } - - public Boolean terminatedEarly() { - return this.terminatedEarly; - } - - public SearchHits hits() { - return hits; - } - - public Aggregations aggregations() { - return aggregations; - } - - public Suggest suggest() { - return suggest; - } - - /** - * Returns the profile results for this search response (including all shards). - * An empty map is returned if profiling was not enabled - * - * @return Profile results - */ - public Map profile() { - if (profileResults == null) { - return Collections.emptyMap(); - } - return profileResults.getShardResults(); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - hits.toXContent(builder, params); - if (aggregations != null) { - aggregations.toXContent(builder, params); - } - if (suggest != null) { - suggest.toXContent(builder, params); - } - if (profileResults != null) { - profileResults.toXContent(builder, params); - } - return builder; - } - - public static InternalSearchResponse readInternalSearchResponse(StreamInput in) throws IOException { - InternalSearchResponse response = new InternalSearchResponse(); - response.readFrom(in); - return response; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - hits = readSearchHits(in); - if (in.readBoolean()) { - aggregations = InternalAggregations.readAggregations(in); - } - if (in.readBoolean()) { - suggest = Suggest.readSuggest(in); - } - timedOut = in.readBoolean(); - terminatedEarly = in.readOptionalBoolean(); - profileResults = in.readOptionalWriteable(SearchProfileShardResults::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - hits.writeTo(out); - if (aggregations == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - aggregations.writeTo(out); - } - if (suggest == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - suggest.writeTo(out); - } - out.writeBoolean(timedOut); - out.writeOptionalBoolean(terminatedEarly); - out.writeOptionalWriteable(profileResults); - } -} diff --git a/core/src/main/java/org/elasticsearch/search/profile/SearchProfileShardResults.java b/core/src/main/java/org/elasticsearch/search/profile/SearchProfileShardResults.java index 6794aa49399cb..a88c301afdd2b 100644 --- a/core/src/main/java/org/elasticsearch/search/profile/SearchProfileShardResults.java +++ b/core/src/main/java/org/elasticsearch/search/profile/SearchProfileShardResults.java @@ -42,6 +42,7 @@ */ public final class SearchProfileShardResults implements Writeable, ToXContent{ + public static final String PROFILE_NAME = "profile"; private Map shardResults; public SearchProfileShardResults(Map shardResults) { @@ -75,7 +76,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("profile").startArray("shards"); + builder.startObject(PROFILE_NAME).startArray("shards"); for (Map.Entry entry : shardResults.entrySet()) { builder.startObject(); diff --git a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java index fc372ee6b2dac..d9b4b7fca6e58 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java @@ -47,7 +47,7 @@ */ public class Suggest implements Iterable>>, Streamable, ToXContent { - private static final String NAME = "suggest"; + public static final String NAME = "suggest"; public static final Comparator