diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index e2a6dcac20b06..05ce54437a4d6 100755 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.ActiveShardCount; @@ -49,6 +50,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; @@ -381,6 +383,18 @@ static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOExcep return new Request("DELETE", "/_search/scroll", Collections.emptyMap(), entity); } + static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException { + Params params = Params.builder(); + params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true"); + if (multiSearchRequest.maxConcurrentSearchRequests() != MultiSearchRequest.MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT) { + params.putParam("max_concurrent_searches", Integer.toString(multiSearchRequest.maxConcurrentSearchRequests())); + } + XContent xContent = REQUEST_BODY_CONTENT_TYPE.xContent(); + byte[] source = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, xContent); + HttpEntity entity = new ByteArrayEntity(source, createContentType(xContent.type())); + return new Request("GET", "/_msearch", params.getParams(), entity); + } + private static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) throws IOException { BytesRef source = XContentHelper.toXContent(toXContent, xContentType, false).toBytesRef(); return new ByteArrayEntity(source.bytes, source.offset, source.length, createContentType(xContentType)); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 2ebaf2cf342ff..29ab7f90ff5c8 100755 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -38,6 +38,8 @@ import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; @@ -377,6 +379,28 @@ public final void searchAsync(SearchRequest searchRequest, ActionListenerMulti search API on + * elastic.co + */ + public final MultiSearchResponse multiSearch(MultiSearchRequest multiSearchRequest, Header... headers) throws IOException { + return performRequestAndParseEntity(multiSearchRequest, Request::multiSearch, MultiSearchResponse::fromXContext, + emptySet(), headers); + } + + /** + * Asynchronously executes a multi search using the msearch API + * + * See Multi search API on + * elastic.co + */ + public final void multiSearchAsync(MultiSearchRequest searchRequest, ActionListener listener, Header... headers) { + performRequestAsyncAndParseEntity(searchRequest, Request::multiSearch, MultiSearchResponse::fromXContext, listener, + emptySet(), headers); + } + /** * Executes a search using the Search Scroll API * diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index 3be250d513d21..f72a7cb4dbf51 100755 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchType; @@ -42,6 +43,7 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -56,6 +58,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.rest.action.search.RestSearchAction; +import org.elasticsearch.search.Scroll; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -72,16 +75,21 @@ import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.StringJoiner; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import static java.util.Collections.singletonMap; +import static org.elasticsearch.client.Request.REQUEST_BODY_CONTENT_TYPE; import static org.elasticsearch.client.Request.enforceSameContentType; +import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; public class RequestTests extends ESTestCase { @@ -771,6 +779,55 @@ public void testSearch() throws Exception { } } + public void testMultiSearch() throws IOException { + int numberOfSearchRequests = randomIntBetween(0, 32); + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + for (int i = 0; i < numberOfSearchRequests; i++) { + SearchRequest searchRequest = randomSearchRequest(() -> { + // No need to return a very complex SearchSourceBuilder here, that is tested elsewhere + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.from(randomInt(10)); + searchSourceBuilder.size(randomIntBetween(20, 100)); + return searchSourceBuilder; + }); + // scroll is not supported in the current msearch api, so unset it: + searchRequest.scroll((Scroll) null); + // only expand_wildcards, ignore_unavailable and allow_no_indices can be specified from msearch api, so unset other options: + IndicesOptions randomlyGenerated = searchRequest.indicesOptions(); + IndicesOptions msearchDefault = new MultiSearchRequest().indicesOptions(); + searchRequest.indicesOptions(IndicesOptions.fromOptions( + randomlyGenerated.ignoreUnavailable(), randomlyGenerated.allowNoIndices(), randomlyGenerated.expandWildcardsOpen(), + randomlyGenerated.expandWildcardsClosed(), msearchDefault.allowAliasesToMultipleIndices(), + msearchDefault.forbidClosedIndices(), msearchDefault.ignoreAliases() + )); + multiSearchRequest.add(searchRequest); + } + + Map expectedParams = new HashMap<>(); + expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true"); + if (randomBoolean()) { + multiSearchRequest.maxConcurrentSearchRequests(randomIntBetween(1, 8)); + expectedParams.put("max_concurrent_searches", Integer.toString(multiSearchRequest.maxConcurrentSearchRequests())); + } + + Request request = Request.multiSearch(multiSearchRequest); + assertEquals("/_msearch", request.getEndpoint()); + assertEquals(expectedParams, request.getParameters()); + + List requests = new ArrayList<>(); + CheckedBiConsumer consumer = (searchRequest, p) -> { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(p); + if (searchSourceBuilder.equals(new SearchSourceBuilder()) == false) { + searchRequest.source(searchSourceBuilder); + } + requests.add(searchRequest); + }; + MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())), + REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, + null, xContentRegistry(), true); + assertEquals(requests, multiSearchRequest.requests()); + } + public void testSearchScroll() throws IOException { SearchScrollRequest searchScrollRequest = new SearchScrollRequest(); searchScrollRequest.scrollId(randomAlphaOfLengthBetween(5, 10)); @@ -782,7 +839,7 @@ public void testSearchScroll() throws IOException { assertEquals("/_search/scroll", request.getEndpoint()); assertEquals(0, request.getParameters().size()); assertToXContentBody(searchScrollRequest, request.getEntity()); - assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); } public void testClearScroll() throws IOException { @@ -796,11 +853,11 @@ public void testClearScroll() throws IOException { assertEquals("/_search/scroll", request.getEndpoint()); assertEquals(0, request.getParameters().size()); assertToXContentBody(clearScrollRequest, request.getEntity()); - assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); } private static void assertToXContentBody(ToXContent expectedBody, HttpEntity actualEntity) throws IOException { - BytesReference expectedBytes = XContentHelper.toXContent(expectedBody, Request.REQUEST_BODY_CONTENT_TYPE, false); + BytesReference expectedBytes = XContentHelper.toXContent(expectedBody, REQUEST_BODY_CONTENT_TYPE, false); assertEquals(XContentType.JSON.mediaTypeWithoutParameters(), actualEntity.getContentType().getValue()); assertEquals(expectedBytes, new BytesArray(EntityUtils.toByteArray(actualEntity))); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java index 289ebf372d804..3e72c7c64b6cb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java @@ -23,20 +23,30 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.nio.entity.NStringEntity; +import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.NestedQueryBuilder; +import org.elasticsearch.index.query.ScriptQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.join.aggregations.Children; import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; @@ -45,10 +55,12 @@ import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder; +import org.hamcrest.Matchers; import org.junit.Before; import java.io.IOException; @@ -64,6 +76,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.nullValue; public class SearchIT extends ESRestHighLevelClientTestCase { @@ -80,10 +93,24 @@ public void indexDocuments() throws IOException { StringEntity doc5 = new StringEntity("{\"type\":\"type2\", \"num\":100, \"num2\":10}", ContentType.APPLICATION_JSON); client().performRequest("PUT", "/index/type/5", Collections.emptyMap(), doc5); client().performRequest("POST", "/index/_refresh"); + + StringEntity doc = new StringEntity("{\"field\":\"value1\"}", ContentType.APPLICATION_JSON); + client().performRequest("PUT", "/index1/doc/1", Collections.emptyMap(), doc); + doc = new StringEntity("{\"field\":\"value2\"}", ContentType.APPLICATION_JSON); + client().performRequest("PUT", "/index1/doc/2", Collections.emptyMap(), doc); + doc = new StringEntity("{\"field\":\"value1\"}", ContentType.APPLICATION_JSON); + client().performRequest("PUT", "/index2/doc/3", Collections.emptyMap(), doc); + doc = new StringEntity("{\"field\":\"value2\"}", ContentType.APPLICATION_JSON); + client().performRequest("PUT", "/index2/doc/4", Collections.emptyMap(), doc); + doc = new StringEntity("{\"field\":\"value1\"}", ContentType.APPLICATION_JSON); + client().performRequest("PUT", "/index3/doc/5", Collections.emptyMap(), doc); + doc = new StringEntity("{\"field\":\"value2\"}", ContentType.APPLICATION_JSON); + client().performRequest("PUT", "/index3/doc/6", Collections.emptyMap(), doc); + client().performRequest("POST", "/index1,index2,index3/_refresh"); } public void testSearchNoQuery() throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest = new SearchRequest("index"); SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); assertSearchHeader(searchResponse); assertNull(searchResponse.getAggregations()); @@ -106,7 +133,7 @@ public void testSearchNoQuery() throws IOException { } public void testSearchMatchQuery() throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest = new SearchRequest("index"); searchRequest.source(new SearchSourceBuilder().query(new MatchQueryBuilder("num", 10))); SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); assertSearchHeader(searchResponse); @@ -164,7 +191,7 @@ public void testSearchWithRangeAgg() throws IOException { assertEquals(RestStatus.BAD_REQUEST, exception.status()); } - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest = new SearchRequest("index"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(new RangeAggregationBuilder("agg1").field("num") .addRange("first", 0, 30).addRange("second", 31, 200)); @@ -193,7 +220,7 @@ public void testSearchWithRangeAgg() throws IOException { } public void testSearchWithTermsAndRangeAgg() throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest = new SearchRequest("index"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); TermsAggregationBuilder agg = new TermsAggregationBuilder("agg1", ValueType.STRING).field("type.keyword"); agg.subAggregation(new RangeAggregationBuilder("subagg").field("num") @@ -247,7 +274,7 @@ public void testSearchWithTermsAndRangeAgg() throws IOException { } public void testSearchWithMatrixStats() throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest = new SearchRequest("index"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(new MatrixStatsAggregationBuilder("agg1").fields(Arrays.asList("num", "num2"))); searchSourceBuilder.size(0); @@ -374,7 +401,7 @@ public void testSearchWithParentJoin() throws IOException { } public void testSearchWithSuggest() throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest = new SearchRequest("index"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion("sugg1", new PhraseSuggestionBuilder("type")) .setGlobalText("type")); @@ -464,6 +491,185 @@ public void testSearchScroll() throws Exception { } } + public void testMultiSearch() throws Exception { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + SearchRequest searchRequest1 = new SearchRequest("index1"); + searchRequest1.source().sort("_id", SortOrder.ASC); + multiSearchRequest.add(searchRequest1); + SearchRequest searchRequest2 = new SearchRequest("index2"); + searchRequest2.source().sort("_id", SortOrder.ASC); + multiSearchRequest.add(searchRequest2); + SearchRequest searchRequest3 = new SearchRequest("index3"); + searchRequest3.source().sort("_id", SortOrder.ASC); + multiSearchRequest.add(searchRequest3); + + MultiSearchResponse multiSearchResponse = + execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync); + assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3)); + + assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse()); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L)); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("1")); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(1).getId(), Matchers.equalTo("2")); + + assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[1].getResponse()); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L)); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("3")); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(1).getId(), Matchers.equalTo("4")); + + assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[2].getResponse()); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L)); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("5")); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(1).getId(), Matchers.equalTo("6")); + } + + public void testMultiSearch_withAgg() throws Exception { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + SearchRequest searchRequest1 = new SearchRequest("index1"); + searchRequest1.source().size(0).aggregation(new TermsAggregationBuilder("name", ValueType.STRING).field("field.keyword") + .order(BucketOrder.key(true))); + multiSearchRequest.add(searchRequest1); + SearchRequest searchRequest2 = new SearchRequest("index2"); + searchRequest2.source().size(0).aggregation(new TermsAggregationBuilder("name", ValueType.STRING).field("field.keyword") + .order(BucketOrder.key(true))); + multiSearchRequest.add(searchRequest2); + SearchRequest searchRequest3 = new SearchRequest("index3"); + searchRequest3.source().size(0).aggregation(new TermsAggregationBuilder("name", ValueType.STRING).field("field.keyword") + .order(BucketOrder.key(true))); + multiSearchRequest.add(searchRequest3); + + MultiSearchResponse multiSearchResponse = + execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync); + assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3)); + + assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse()); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L)); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getHits().length, Matchers.equalTo(0)); + Terms terms = multiSearchResponse.getResponses()[0].getResponse().getAggregations().get("name"); + assertThat(terms.getBuckets().size(), Matchers.equalTo(2)); + assertThat(terms.getBuckets().get(0).getKeyAsString(), Matchers.equalTo("value1")); + assertThat(terms.getBuckets().get(1).getKeyAsString(), Matchers.equalTo("value2")); + + assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse()); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L)); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getHits().length, Matchers.equalTo(0)); + terms = multiSearchResponse.getResponses()[1].getResponse().getAggregations().get("name"); + assertThat(terms.getBuckets().size(), Matchers.equalTo(2)); + assertThat(terms.getBuckets().get(0).getKeyAsString(), Matchers.equalTo("value1")); + assertThat(terms.getBuckets().get(1).getKeyAsString(), Matchers.equalTo("value2")); + + assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse()); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L)); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getHits().length, Matchers.equalTo(0)); + terms = multiSearchResponse.getResponses()[2].getResponse().getAggregations().get("name"); + assertThat(terms.getBuckets().size(), Matchers.equalTo(2)); + assertThat(terms.getBuckets().get(0).getKeyAsString(), Matchers.equalTo("value1")); + assertThat(terms.getBuckets().get(1).getKeyAsString(), Matchers.equalTo("value2")); + } + + public void testMultiSearch_withQuery() throws Exception { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + SearchRequest searchRequest1 = new SearchRequest("index1"); + searchRequest1.source().query(new TermsQueryBuilder("field", "value2")); + multiSearchRequest.add(searchRequest1); + SearchRequest searchRequest2 = new SearchRequest("index2"); + searchRequest2.source().query(new TermsQueryBuilder("field", "value2")); + multiSearchRequest.add(searchRequest2); + SearchRequest searchRequest3 = new SearchRequest("index3"); + searchRequest3.source().query(new TermsQueryBuilder("field", "value2")); + multiSearchRequest.add(searchRequest3); + + MultiSearchResponse multiSearchResponse = + execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync); + assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3)); + + assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse()); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L)); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("2")); + + assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[1].getResponse()); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L)); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("4")); + + assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[2].getResponse()); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L)); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("6")); + + searchRequest1.source().highlighter(new HighlightBuilder().field("field")); + searchRequest2.source().highlighter(new HighlightBuilder().field("field")); + searchRequest3.source().highlighter(new HighlightBuilder().field("field")); + multiSearchResponse = execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync); + assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3)); + + assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse()); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L)); + assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(0).getHighlightFields() + .get("field").fragments()[0].string(), Matchers.equalTo("value2")); + + assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[1].getResponse()); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L)); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("4")); + assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getHighlightFields() + .get("field").fragments()[0].string(), Matchers.equalTo("value2")); + + assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue()); + assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false)); + SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[2].getResponse()); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L)); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("6")); + assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getHighlightFields() + .get("field").fragments()[0].string(), Matchers.equalTo("value2")); + } + + public void testMultiSearch_failure() throws Exception { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + SearchRequest searchRequest1 = new SearchRequest("index1"); + searchRequest1.source().query(new ScriptQueryBuilder(new Script(ScriptType.INLINE, "invalid", "code", Collections.emptyMap()))); + multiSearchRequest.add(searchRequest1); + SearchRequest searchRequest2 = new SearchRequest("index2"); + searchRequest2.source().query(new ScriptQueryBuilder(new Script(ScriptType.INLINE, "invalid", "code", Collections.emptyMap()))); + multiSearchRequest.add(searchRequest2); + + MultiSearchResponse multiSearchResponse = + execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync); + assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(2)); + + assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(true)); + assertThat(multiSearchResponse.getResponses()[0].getFailure().getMessage(), containsString("search_phase_execution_exception")); + assertThat(multiSearchResponse.getResponses()[0].getResponse(), nullValue()); + + assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(true)); + assertThat(multiSearchResponse.getResponses()[1].getFailure().getMessage(), containsString("search_phase_execution_exception")); + assertThat(multiSearchResponse.getResponses()[1].getResponse(), nullValue()); + } + private static void assertSearchHeader(SearchResponse searchResponse) { assertThat(searchResponse.getTook().nanos(), greaterThanOrEqualTo(0L)); assertEquals(0, searchResponse.getFailedShards()); diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index 76f73bde4b658..7772b24565857 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -23,20 +23,36 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue; /** * A multi search API request. */ public class MultiSearchRequest extends ActionRequest implements CompositeIndicesRequest { + public static final int MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT = 0; + private int maxConcurrentSearchRequests = 0; private List requests = new ArrayList<>(); @@ -131,4 +147,171 @@ public void writeTo(StreamOutput out) throws IOException { request.writeTo(out); } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MultiSearchRequest that = (MultiSearchRequest) o; + return maxConcurrentSearchRequests == that.maxConcurrentSearchRequests && + Objects.equals(requests, that.requests) && + Objects.equals(indicesOptions, that.indicesOptions); + } + + @Override + public int hashCode() { + return Objects.hash(maxConcurrentSearchRequests, requests, indicesOptions); + } + + public static void readMultiLineFormat(BytesReference data, + XContent xContent, + CheckedBiConsumer consumer, + String[] indices, + IndicesOptions indicesOptions, + String[] types, + String routing, + String searchType, + NamedXContentRegistry registry, + boolean allowExplicitIndex) throws IOException { + int from = 0; + int length = data.length(); + byte marker = xContent.streamSeparator(); + while (true) { + int nextMarker = findNextMarker(marker, from, data, length); + if (nextMarker == -1) { + break; + } + // support first line with \n + if (nextMarker == 0) { + from = nextMarker + 1; + continue; + } + + SearchRequest searchRequest = new SearchRequest(); + if (indices != null) { + searchRequest.indices(indices); + } + if (indicesOptions != null) { + searchRequest.indicesOptions(indicesOptions); + } + if (types != null && types.length > 0) { + searchRequest.types(types); + } + if (routing != null) { + searchRequest.routing(routing); + } + if (searchType != null) { + searchRequest.searchType(searchType); + } + IndicesOptions defaultOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + // now parse the action + if (nextMarker - from > 0) { + try (XContentParser parser = xContent.createParser(registry, data.slice(from, nextMarker - from))) { + Map source = parser.map(); + for (Map.Entry entry : source.entrySet()) { + Object value = entry.getValue(); + if ("index".equals(entry.getKey()) || "indices".equals(entry.getKey())) { + if (!allowExplicitIndex) { + throw new IllegalArgumentException("explicit index in multi search is not allowed"); + } + searchRequest.indices(nodeStringArrayValue(value)); + } else if ("type".equals(entry.getKey()) || "types".equals(entry.getKey())) { + searchRequest.types(nodeStringArrayValue(value)); + } else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) { + searchRequest.searchType(nodeStringValue(value, null)); + } else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) { + searchRequest.requestCache(nodeBooleanValue(value, entry.getKey())); + } else if ("preference".equals(entry.getKey())) { + searchRequest.preference(nodeStringValue(value, null)); + } else if ("routing".equals(entry.getKey())) { + searchRequest.routing(nodeStringValue(value, null)); + } + } + defaultOptions = IndicesOptions.fromMap(source, defaultOptions); + } + } + searchRequest.indicesOptions(defaultOptions); + + // move pointers + from = nextMarker + 1; + // now for the body + nextMarker = findNextMarker(marker, from, data, length); + if (nextMarker == -1) { + break; + } + BytesReference bytes = data.slice(from, nextMarker - from); + try (XContentParser parser = xContent.createParser(registry, bytes)) { + consumer.accept(searchRequest, parser); + } + // move pointers + from = nextMarker + 1; + } + } + + private static int findNextMarker(byte marker, int from, BytesReference data, int length) { + for (int i = from; i < length; i++) { + if (data.get(i) == marker) { + return i; + } + } + if (from != length) { + throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]"); + } + return -1; + } + + public static byte[] writeMultiLineFormat(MultiSearchRequest multiSearchRequest, XContent xContent) throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + for (SearchRequest request : multiSearchRequest.requests()) { + try (XContentBuilder xContentBuilder = XContentBuilder.builder(xContent)) { + xContentBuilder.startObject(); + if (request.indices() != null) { + xContentBuilder.field("index", request.indices()); + } + if (request.indicesOptions() != null && request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) { + if (request.indicesOptions().expandWildcardsOpen() && request.indicesOptions().expandWildcardsClosed()) { + xContentBuilder.field("expand_wildcards", "all"); + } else if (request.indicesOptions().expandWildcardsOpen()) { + xContentBuilder.field("expand_wildcards", "open"); + } else if (request.indicesOptions().expandWildcardsClosed()) { + xContentBuilder.field("expand_wildcards", "closed"); + } else { + xContentBuilder.field("expand_wildcards", "none"); + } + xContentBuilder.field("ignore_unavailable", request.indicesOptions().ignoreUnavailable()); + xContentBuilder.field("allow_no_indices", request.indicesOptions().allowNoIndices()); + } + if (request.types() != null) { + xContentBuilder.field("types", request.types()); + } + if (request.searchType() != null) { + xContentBuilder.field("search_type", request.searchType().name().toLowerCase(Locale.ROOT)); + } + if (request.requestCache() != null) { + xContentBuilder.field("request_cache", request.requestCache()); + } + if (request.preference() != null) { + xContentBuilder.field("preference", request.preference()); + } + if (request.routing() != null) { + xContentBuilder.field("routing", request.routing()); + } + xContentBuilder.endObject(); + xContentBuilder.bytes().writeTo(output); + } + output.write(xContent.streamSeparator()); + try (XContentBuilder xContentBuilder = XContentBuilder.builder(xContent)) { + if (request.source() != null) { + request.source().toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + } else { + xContentBuilder.startObject(); + xContentBuilder.endObject(); + } + xContentBuilder.bytes().writeTo(output); + } + output.write(xContent.streamSeparator()); + } + return output.toByteArray(); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 560379a6ce2f6..cb30385ecc868 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -24,23 +24,39 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; /** * A multi search response. */ public class MultiSearchResponse extends ActionResponse implements Iterable, ToXContentObject { + private static final ParseField RESPONSES = new ParseField(Fields.RESPONSES); + private static final ParseField TOOK_IN_MILLIS = new ParseField("took"); + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("multi_search", + true, a -> new MultiSearchResponse(((List)a[0]).toArray(new Item[0]), (long) a[1])); + static { + PARSER.declareObjectArray(constructorArg(), (p, c) -> itemFromXContent(p), RESPONSES); + PARSER.declareLong(constructorArg(), TOOK_IN_MILLIS); + } + /** * A search response item, holding the actual search response, or an error message if it failed. */ @@ -188,6 +204,45 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static MultiSearchResponse fromXContext(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private static MultiSearchResponse.Item itemFromXContent(XContentParser parser) throws IOException { + // This parsing logic is a bit tricky here, because the multi search response itself is tricky: + // 1) The json objects inside the responses array are either a search response or a serialized exception + // 2) Each response json object gets a status field injected that ElasticsearchException.failureFromXContent(...) does not parse, + // but SearchResponse.innerFromXContent(...) parses and then ignores. The status field is not needed to parse + // the response item. However in both cases this method does need to parse the 'status' field otherwise the parsing of + // the response item in the next json array element will fail due to parsing errors. + + Item item = null; + String fieldName = null; + + Token token = parser.nextToken(); + assert token == Token.FIELD_NAME; + outer: for (; token != Token.END_OBJECT; token = parser.nextToken()) { + switch (token) { + case FIELD_NAME: + fieldName = parser.currentName(); + if ("error".equals(fieldName)) { + item = new Item(null, ElasticsearchException.failureFromXContent(parser)); + } else if ("status".equals(fieldName) == false) { + item = new Item(SearchResponse.innerFromXContent(parser), null); + break outer; + } + break; + case VALUE_NUMBER: + if ("status".equals(fieldName)) { + // Ignore the status value + } + break; + } + } + assert parser.currentToken() == Token.END_OBJECT; + return item; + } + static final class Fields { static final String RESPONSES = "responses"; static final String STATUS = "status"; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 02d2e6a3429d4..a8bbd6989185b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.search.SearchHits; @@ -242,9 +243,14 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t } public static SearchResponse fromXContent(XContentParser parser) throws IOException { - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); - XContentParser.Token token; - String currentFieldName = null; + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + parser.nextToken(); + return innerFromXContent(parser); + } + + static SearchResponse innerFromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); + String currentFieldName = parser.currentName(); SearchHits hits = null; Aggregations aggs = null; Suggest suggest = null; @@ -259,8 +265,8 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept String scrollId = null; List failures = new ArrayList<>(); Clusters clusters = Clusters.EMPTY; - while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { + for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) { + if (token == Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token.isValue()) { if (SCROLL_ID.match(currentFieldName)) { @@ -276,7 +282,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept } else { parser.skipChildren(); } - } else if (token == XContentParser.Token.START_OBJECT) { + } else if (token == Token.START_OBJECT) { if (SearchHits.Fields.HITS.equals(currentFieldName)) { hits = SearchHits.fromXContent(parser); } else if (Aggregations.AGGREGATIONS_FIELD.equals(currentFieldName)) { @@ -286,8 +292,8 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept } else if (SearchProfileShardResults.PROFILE_FIELD.equals(currentFieldName)) { profile = SearchProfileShardResults.fromXContent(parser); } else if (RestActions._SHARDS_FIELD.match(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { + while ((token = parser.nextToken()) != Token.END_OBJECT) { + if (token == Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token.isValue()) { if (RestActions.FAILED_FIELD.match(currentFieldName)) { @@ -301,9 +307,9 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept } else { parser.skipChildren(); } - } else if (token == XContentParser.Token.START_ARRAY) { + } else if (token == Token.START_ARRAY) { if (RestActions.FAILURES_FIELD.match(currentFieldName)) { - while((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + while((token = parser.nextToken()) != Token.END_ARRAY) { failures.add(ShardSearchFailure.fromXContent(parser)); } } else { diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 9dec3be5c1b11..371314b990c41 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -76,7 +76,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener { - try { - searchRequest.source(SearchSourceBuilder.fromXContent(parser)); - multiRequest.add(searchRequest); - } catch (IOException e) { - throw new ElasticsearchParseException("Exception when parsing search request", e); - } + searchRequest.source(SearchSourceBuilder.fromXContent(parser)); + multiRequest.add(searchRequest); }); List requests = multiRequest.requests(); preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1)); @@ -113,7 +110,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, boolean a * Parses a multi-line {@link RestRequest} body, instantiating a {@link SearchRequest} for each line and applying the given consumer. */ public static void parseMultiLineRequest(RestRequest request, IndicesOptions indicesOptions, boolean allowExplicitIndex, - BiConsumer consumer) throws IOException { + CheckedBiConsumer consumer) throws IOException { String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] types = Strings.splitStringByCommaToArray(request.param("type")); @@ -123,83 +120,8 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind final Tuple sourceTuple = request.contentOrSourceParam(); final XContent xContent = sourceTuple.v1().xContent(); final BytesReference data = sourceTuple.v2(); - - int from = 0; - int length = data.length(); - byte marker = xContent.streamSeparator(); - while (true) { - int nextMarker = findNextMarker(marker, from, data, length); - if (nextMarker == -1) { - break; - } - // support first line with \n - if (nextMarker == 0) { - from = nextMarker + 1; - continue; - } - - SearchRequest searchRequest = new SearchRequest(); - if (indices != null) { - searchRequest.indices(indices); - } - if (indicesOptions != null) { - searchRequest.indicesOptions(indicesOptions); - } - if (types != null && types.length > 0) { - searchRequest.types(types); - } - if (routing != null) { - searchRequest.routing(routing); - } - if (searchType != null) { - searchRequest.searchType(searchType); - } - - IndicesOptions defaultOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); - - - // now parse the action - if (nextMarker - from > 0) { - try (XContentParser parser = xContent.createParser(request.getXContentRegistry(), data.slice(from, nextMarker - from))) { - Map source = parser.map(); - for (Map.Entry entry : source.entrySet()) { - Object value = entry.getValue(); - if ("index".equals(entry.getKey()) || "indices".equals(entry.getKey())) { - if (!allowExplicitIndex) { - throw new IllegalArgumentException("explicit index in multi search is not allowed"); - } - searchRequest.indices(nodeStringArrayValue(value)); - } else if ("type".equals(entry.getKey()) || "types".equals(entry.getKey())) { - searchRequest.types(nodeStringArrayValue(value)); - } else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) { - searchRequest.searchType(nodeStringValue(value, null)); - } else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) { - searchRequest.requestCache(nodeBooleanValue(value, entry.getKey())); - } else if ("preference".equals(entry.getKey())) { - searchRequest.preference(nodeStringValue(value, null)); - } else if ("routing".equals(entry.getKey())) { - searchRequest.routing(nodeStringValue(value, null)); - } - } - defaultOptions = IndicesOptions.fromMap(source, defaultOptions); - } - } - searchRequest.indicesOptions(defaultOptions); - - // move pointers - from = nextMarker + 1; - // now for the body - nextMarker = findNextMarker(marker, from, data, length); - if (nextMarker == -1) { - break; - } - BytesReference bytes = data.slice(from, nextMarker - from); - try (XContentParser parser = xContent.createParser(request.getXContentRegistry(), bytes)) { - consumer.accept(searchRequest, parser); - } - // move pointers - from = nextMarker + 1; - } + MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, types, routing, + searchType, request.getXContentRegistry(), allowExplicitIndex); } @Override @@ -207,18 +129,6 @@ public boolean supportsContentStream() { return true; } - private static int findNextMarker(byte marker, int from, BytesReference data, int length) { - for (int i = from; i < length; i++) { - if (data.get(i) == marker) { - return i; - } - } - if (from != length) { - throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]"); - } - return -1; - } - @Override protected Set responseParams() { return RESPONSE_PARAMS; diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index e6de1d859d867..faec42b2587d1 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -19,24 +19,36 @@ package org.elasticsearch.action.search; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.search.RestMultiSearchAction; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.test.rest.FakeRestRequest; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import static java.util.Collections.singletonList; +import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest; +import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -202,4 +214,87 @@ protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(singletonList(new NamedXContentRegistry.Entry(QueryBuilder.class, new ParseField(MatchAllQueryBuilder.NAME), (p, c) -> MatchAllQueryBuilder.fromXContent(p)))); } + + public void testMultiLineSerialization() throws IOException { + int iters = 16; + for (int i = 0; i < iters; i++) { + // The only formats that support stream separator + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + MultiSearchRequest originalRequest = createMultiSearchRequest(); + + byte[] originalBytes = MultiSearchRequest.writeMultiLineFormat(originalRequest, xContentType.xContent()); + MultiSearchRequest parsedRequest = new MultiSearchRequest(); + CheckedBiConsumer consumer = (r, p) -> { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(p); + if (searchSourceBuilder.equals(new SearchSourceBuilder()) == false) { + r.source(searchSourceBuilder); + } + parsedRequest.add(r); + }; + MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(), + consumer, null, null, null, null, null, xContentRegistry(), true); + assertEquals(originalRequest, parsedRequest); + } + } + + public void testEqualsAndHashcode() throws IOException { + checkEqualsAndHashCode(createMultiSearchRequest(), MultiSearchRequestTests::copyRequest, MultiSearchRequestTests::mutate); + } + + private static MultiSearchRequest mutate(MultiSearchRequest searchRequest) throws IOException { + MultiSearchRequest mutation = copyRequest(searchRequest); + List> mutators = new ArrayList<>(); + mutators.add(() -> mutation.indicesOptions(randomValueOtherThan(searchRequest.indicesOptions(), + () -> IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())))); + mutators.add(() -> mutation.maxConcurrentSearchRequests(randomIntBetween(1, 32))); + mutators.add(() -> mutation.add(createSimpleSearchRequest())); + randomFrom(mutators).run(); + return mutation; + } + + private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws IOException { + MultiSearchRequest copy = new MultiSearchRequest(); + if (request.maxConcurrentSearchRequests() > 0) { + copy.maxConcurrentSearchRequests(request.maxConcurrentSearchRequests()); + } + copy.indicesOptions(request.indicesOptions()); + for (SearchRequest searchRequest : request.requests()) { + copy.add(searchRequest); + } + return copy; + } + + private static MultiSearchRequest createMultiSearchRequest() throws IOException { + int numSearchRequest = randomIntBetween(1, 128); + MultiSearchRequest request = new MultiSearchRequest(); + for (int j = 0; j < numSearchRequest; j++) { + SearchRequest searchRequest = createSimpleSearchRequest(); + + // scroll is not supported in the current msearch api, so unset it: + searchRequest.scroll((Scroll) null); + + // only expand_wildcards, ignore_unavailable and allow_no_indices can be specified from msearch api, so unset other options: + IndicesOptions randomlyGenerated = searchRequest.indicesOptions(); + IndicesOptions msearchDefault = IndicesOptions.strictExpandOpenAndForbidClosed(); + searchRequest.indicesOptions(IndicesOptions.fromOptions( + randomlyGenerated.ignoreUnavailable(), randomlyGenerated.allowNoIndices(), randomlyGenerated.expandWildcardsOpen(), + randomlyGenerated.expandWildcardsClosed(), msearchDefault.allowAliasesToMultipleIndices(), + msearchDefault.forbidClosedIndices(), msearchDefault.ignoreAliases() + )); + + request.add(searchRequest); + } + return request; + } + + private static SearchRequest createSimpleSearchRequest() throws IOException { + return randomSearchRequest(() -> { + // No need to return a very complex SearchSourceBuilder here, that is tested elsewhere + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.from(randomInt(10)); + searchSourceBuilder.size(randomIntBetween(20, 100)); + return searchSourceBuilder; + }); + } + } diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java new file mode 100644 index 0000000000000..e31f593193d2e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.XContentTestUtils.insertRandomFields; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class MultiSearchResponseTests extends ESTestCase { + + public void testFromXContent() throws IOException { + for (int runs = 0; runs < 20; runs++) { + MultiSearchResponse expected = createTestInstance(); + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference shuffled = toShuffledXContent(expected, xContentType, ToXContent.EMPTY_PARAMS, false); + XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled); + MultiSearchResponse actual = MultiSearchResponse.fromXContext(parser); + assertThat(parser.nextToken(), nullValue()); + + assertThat(actual.getTook(), equalTo(expected.getTook())); + assertThat(actual.getResponses().length, equalTo(expected.getResponses().length)); + for (int i = 0; i < expected.getResponses().length; i++) { + MultiSearchResponse.Item expectedItem = expected.getResponses()[i]; + MultiSearchResponse.Item actualItem = actual.getResponses()[i]; + if (expectedItem.isFailure()) { + assertThat(actualItem.getResponse(), nullValue()); + assertThat(actualItem.getFailureMessage(), containsString(expectedItem.getFailureMessage())); + } else { + assertThat(actualItem.getResponse().toString(), equalTo(expectedItem.getResponse().toString())); + assertThat(actualItem.getFailure(), nullValue()); + } + } + } + } + + private static MultiSearchResponse createTestInstance() { + int numItems = randomIntBetween(0, 128); + MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numItems]; + for (int i = 0; i < numItems; i++) { + if (randomBoolean()) { + // Creating a minimal response is OK, because SearchResponse self + // is tested elsewhere. + long tookInMillis = randomNonNegativeLong(); + int totalShards = randomIntBetween(1, Integer.MAX_VALUE); + int successfulShards = randomIntBetween(0, totalShards); + int skippedShards = totalShards - successfulShards; + InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); + SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, + successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); + items[i] = new MultiSearchResponse.Item(searchResponse, null); + } else { + items[i] = new MultiSearchResponse.Item(null, new ElasticsearchException("an error")); + } + } + return new MultiSearchResponse(items, randomNonNegativeLong()); + } + +} diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestMultiSearchTemplateAction.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestMultiSearchTemplateAction.java index f129a5b15ec7c..fd797c4340a8f 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestMultiSearchTemplateAction.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestMultiSearchTemplateAction.java @@ -77,16 +77,12 @@ public static MultiSearchTemplateRequest parseRequest(RestRequest restRequest, b RestMultiSearchAction.parseMultiLineRequest(restRequest, multiRequest.indicesOptions(), allowExplicitIndex, (searchRequest, bytes) -> { - try { - SearchTemplateRequest searchTemplateRequest = RestSearchTemplateAction.parse(bytes); - if (searchTemplateRequest.getScript() != null) { - searchTemplateRequest.setRequest(searchRequest); - multiRequest.add(searchTemplateRequest); - } else { - throw new IllegalArgumentException("Malformed search template"); - } - } catch (IOException e) { - throw new ElasticsearchParseException("Exception when parsing search template request", e); + SearchTemplateRequest searchTemplateRequest = RestSearchTemplateAction.parse(bytes); + if (searchTemplateRequest.getScript() != null) { + searchTemplateRequest.setRequest(searchRequest); + multiRequest.add(searchTemplateRequest); + } else { + throw new IllegalArgumentException("Malformed search template"); } }); return multiRequest;