diff --git a/build.gradle b/build.gradle index c4d5a6ee86496..ed9f50b67e958 100644 --- a/build.gradle +++ b/build.gradle @@ -160,8 +160,8 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = true -final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +boolean bwc_tests_enabled = false +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40177" /* place a PR link here when committing bwc changes */ if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index 9c1b1b3697a63..ad20b9e360ced 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -4,7 +4,8 @@ The _{ccs}_ feature allows any node to act as a federated client across multiple clusters. A {ccs} node won't join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute -federated search requests. +federated search requests. For details on communication and compatibility +between different clusters, see <>. [float] === Using {ccs} @@ -43,7 +44,8 @@ PUT _cluster/settings // TEST[s/127.0.0.1:9300/\${transport_host}/] To search the `twitter` index on remote cluster `cluster_one` the index name -must be prefixed with the cluster alias separated by a `:` character: +must be prefixed with the alias of the remote cluster followed by the `:` +character: [source,js] -------------------------------------------------- @@ -104,7 +106,7 @@ GET /cluster_one:twitter/_search // TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] -Indices can also be searched with the same name on different clusters: +Indices with the same name on different clusters can also be searched: [source,js] -------------------------------------------------- @@ -120,10 +122,10 @@ GET /cluster_one:twitter,twitter/_search // CONSOLE // TEST[continued] -Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are -identical these indices will be treated as different indices when results are merged. All results retrieved from a -remote index -will be prefixed with their remote cluster name: +Search results are disambiguated the same way as the indices are disambiguated in the request. +Indices with same names are treated as different indices when results are merged. All results +retrieved from an index located in a remote cluster are prefixed with their corresponding +cluster alias: [source,js] -------------------------------------------------- @@ -185,10 +187,11 @@ will be prefixed with their remote cluster name: [float] === Skipping disconnected clusters -By default all remote clusters that are searched via {ccs} need to be available when -the search request is executed, otherwise the whole request fails and no search results are returned -despite some of the clusters are available. Remote clusters can be made optional through the -boolean `skip_unavailable` setting, set to `false` by default. +By default, all remote clusters that are searched via {ccs} need to be +available when the search request is executed. Otherwise, the whole request +fails; even if some of the clusters are available, no search results are +returned. You can use the boolean `skip_unavailable` setting to make remote +clusters optional. By default, it is set to `false`. [source,js] -------------------------------- @@ -312,6 +315,3 @@ The <> supports the `ccs_minimize_roundtrips` parameter, which defaults to `true` and can be set to `false` in case minimizing network round-trips is not desirable. -Note that all the communication between the nodes, regardless of which cluster -they belong to and the selected reduce mode, happens through the -<>. diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 314d95cc79eeb..b134a626739fa 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -14,16 +14,46 @@ connections to a remote cluster. This functionality is used in endif::[] Remote cluster connections work by configuring a remote cluster and connecting -only to a limited number of nodes in the remote cluster. Each remote cluster is -referenced by a name and a list of seed nodes. When a remote cluster is -registered, its cluster state is retrieved from one of the seed nodes so that by -default up to three _gateway nodes_ are selected to be connected to as part of -remote cluster requests. Remote cluster connections consist of uni-directional -connections from the coordinating node to the previously selected remote nodes -only. You can tag which nodes should be selected by using node attributes (see <>). +only to a limited number of nodes in that remote cluster. Each remote cluster +is referenced by a name and a list of seed nodes. When a remote cluster is +registered, its cluster state is retrieved from one of the seed nodes and up +to three _gateway nodes_ are selected to be connected to as part of remote +cluster requests. All the communication required between different clusters +goes through the <>. Remote cluster +connections consist of uni-directional connections from the coordinating +node to the selected remote _gateway nodes_ only. -Each node in a cluster that has remote clusters configured connects to one or -more _gateway nodes_ and uses them to federate requests to the remote cluster. +[float] +[[gateway-nodes-selection]] +=== Gateway nodes selection + +The _gateway nodes_ selection depends on the following criteria: + +- *version*: Remote nodes must be compatible with the cluster they are +registered to. This is subject to the same rules as <>. +Any node can communicate with any other node on the same major version (e.g. +6.0 can talk to any 6.x node). Only nodes on the last minor version of a +certain major version can communicate with nodes on the following major +version (e.g. 6.7 can communicate with 7.0, as well as any 7.x node, while + 6.6 or earlier cannot talk to any 7.x node). Note that version compatibility + is symmetric, meaning that if 6.7 can communicate with 7.0, 7.0 can also + communicate with 6.7. The matrix below summarizes compatibility as described + above. + +[cols="^,^,^,^,^,^"] +|==== +| Compatibility | 5.0->5.5 | 5.6 | 6.0->6.6 | 6.7 | 7.x +| 5.0->5.5 | Yes | Yes | No | No | No +| 5.6 | Yes | Yes | Yes | Yes | No +| 6.0->6.6 | No | Yes | Yes | Yes | No +| 6.7 | No | Yes | Yes | Yes | Yes +| 7.x | No | No | No | Yes | Yes +|==== + +- *role*: Dedicated master nodes never get selected. +- *attributes*: You can tag which nodes should be selected +(see <>), though such tagged nodes still have +to satisfy the two above requirements. [float] [[configuring-remote-clusters]] @@ -31,7 +61,8 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster. You can configure remote clusters globally by using <>, which you can update dynamically. -Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file. +Alternatively, you can configure them locally on individual nodes by using the + `elasticsearch.yml` file. If you specify the settings in `elasticsearch.yml` files, only the nodes with those settings can connect to the remote cluster. In other words, functionality @@ -59,7 +90,8 @@ between local and remote indices. For more information about the optional transport settings, see <>. -If you use <>, the remote clusters are available on every node in the cluster. For example: +If you use <>, the remote clusters +are available on every node in the cluster. For example: [source,js] -------------------------------- diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index f87469cf55962..58b59be6a5718 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -28,6 +28,10 @@ back up a cluster is by using the snapshot and restore functionality. [float] === Version compatibility +IMPORTANT: Version compatibility refers to the underlying Lucene index +compatibility. Follow the <> +when migrating between versions. + A snapshot contains a copy of the on-disk data structures that make up an index. This means that snapshots can only be restored to versions of Elasticsearch that can read the indices: diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 751e28d16bc89..910b82c8ccbf4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -42,7 +42,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -66,8 +65,6 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public final class SearchPhaseController { @@ -491,8 +488,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection aggregationsList) { - ReduceContext reduceContext = reduceContextFunction.apply(false); - return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, - null, reduceContext); - } - - private static InternalAggregations reduceAggs(List aggregationsList, - List pipelineAggregators, ReduceContext reduceContext) { - InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - return new InternalAggregations(newAggs); - } - return aggregations; - } - public static final class ReducedQueryPhase { // the sum of all hits across all reduces shards final TotalHits totalHits; @@ -646,7 +617,8 @@ public void consumeResult(SearchPhaseResult result) { private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (index == bufferSize) { if (hasAggs) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext); Arrays.fill(aggsBuffer, null); aggsBuffer[0] = reducedAggs; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 999ea9b20548d..84a536c18db01 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -168,10 +168,16 @@ SearchResponse getMergedResponse(Clusters clusters) { assert trackTotalHits == null || trackTotalHits; trackTotalHits = true; } + TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards); topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()), searchResponse.isTimedOut(), searchResponse.isTerminatedEarly()); - topDocsList.add(topDocs); + if (searchHits.getHits().length > 0) { + //there is no point in adding empty search hits and merging them with the others. Also, empty search hits always come + //without sort fields and collapse info, despite sort by field and/or field collapsing was requested, which causes + //issues reconstructing the proper TopDocs instance and breaks mergeTopDocs which expects the same type for each result. + topDocsList.add(topDocs); + } } //after going through all the hits and collecting all their distinct shards, we can assign shardIndex and set it to the ScoreDocs @@ -297,12 +303,17 @@ private static void setShardIndex(Map shards, L } private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topDocsStats) { - SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length]; - for (int i = 0; i < topDocs.scoreDocs.length; i++) { - FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i]; - searchHits[i] = scoreDoc.searchHit; + SearchHit[] searchHits; + if (topDocs == null) { + //merged TopDocs is null whenever all clusters have returned empty hits + searchHits = new SearchHit[0]; + } else { + searchHits = new SearchHit[topDocs.scoreDocs.length]; + for (int i = 0; i < topDocs.scoreDocs.length; i++) { + FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i]; + searchHits[i] = scoreDoc.searchHit; + } } - SortField[] sortFields = null; String collapseField = null; Object[] collapseValues = null; @@ -370,7 +381,7 @@ public int compareTo(ShardIdAndClusterAlias o) { } int clusterAliasCompareTo = clusterAlias.compareTo(o.clusterAlias); if (clusterAliasCompareTo != 0) { - //TODO we may want to fix this, CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators) + //CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators), fixed from 7.1 on if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { return 1; } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 519f2c88e0e58..597ca216bf275 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -80,6 +80,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; +import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; public class TransportSearchAction extends HandledTransportAction { @@ -248,6 +249,9 @@ static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { if (searchRequest.scroll() != null) { return false; } + if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) { + return false; + } SearchSourceBuilder source = searchRequest.source(); return source == null || source.collapse() == null || source.collapse().getInnerHits() == null || source.collapse().getInnerHits().isEmpty(); @@ -311,8 +315,6 @@ public void onFailure(Exception e) { if (localIndices != null) { ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); - //here we provide the empty string a cluster alias, which means no prefix in index name, - //but the coord node will perform non final reduce as it's not null. SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index d6eb73514d9c3..9683651391cc2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -161,8 +161,7 @@ public AggParseContext(String name) { } } - public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], - new ArrayList()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); private AggregatorFactory[] factories; private List pipelineAggregatorFactories; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index eafdbe109776b..b525fd32d918a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum /** * Returns true iff the current reduce phase is the final reduce phase. This indicates if operations like * pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account. - * Operations that are potentially loosing information can only be applied during the final reduce phase. + * Operations that are potentially losing information can only be applied during the final reduce phase. */ public boolean isFinalReduce() { return isFinalReduce; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 95140b50d2bdf..70135c2d51e73 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -18,17 +18,22 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static java.util.Collections.emptyMap; @@ -48,23 +53,56 @@ public final class InternalAggregations extends Aggregations implements Streamab } }; + private List topLevelPipelineAggregators = Collections.emptyList(); + private InternalAggregations() { } /** - * Constructs a new addAggregation. + * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); } /** - * Reduces the given lists of addAggregation. - * - * @param aggregationsList A list of aggregation to reduce - * @return The reduced addAggregation + * Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s + */ + public InternalAggregations(List aggregations, List topLevelPipelineAggregators) { + super(aggregations); + this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators); + } + + /** + * Returns the top-level pipeline aggregators. + * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they + * become part of the list of {@link InternalAggregation}s. */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { + List getTopLevelPipelineAggregators() { + return topLevelPipelineAggregators; + } + + /** + * Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first + * {@link InternalAggregations} object found in the list. + * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + */ + public static InternalAggregations reduce(List aggregationsList, + ReduceContext context) { + if (aggregationsList.isEmpty()) { + return null; + } + InternalAggregations first = aggregationsList.get(0); + return reduce(aggregationsList, first.topLevelPipelineAggregators, context); + } + + /** + * Reduces the given list of aggregations as well as the provided top-level pipeline aggregators. + * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + */ + public static InternalAggregations reduce(List aggregationsList, + List topLevelPipelineAggregators, + ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -89,7 +127,15 @@ public static InternalAggregations reduce(List aggregation InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); } - return new InternalAggregations(reducedAggregations); + + if (context.isFinalReduce()) { + for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); + reducedAggregations.add(newAgg); + } + return new InternalAggregations(reducedAggregations); + } + return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); } public static InternalAggregations readAggregations(StreamInput in) throws IOException { @@ -104,11 +150,20 @@ public void readFrom(StreamInput in) throws IOException { if (aggregations.isEmpty()) { aggregationsAsMap = emptyMap(); } + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + this.topLevelPipelineAggregators = in.readList( + stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); + } else { + this.topLevelPipelineAggregators = Collections.emptyList(); + } } @Override @SuppressWarnings("unchecked") public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableList((List)aggregations); + if (out.getVersion().onOrAfter(Version.V_6_7_0)) { + out.writeNamedWriteableList(topLevelPipelineAggregators); + } } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 43654823914b4..55787dfc53a35 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -36,10 +36,11 @@ import org.elasticsearch.search.suggest.Suggest; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; @@ -54,7 +55,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private DocValueFormat[] sortValueFormats; private InternalAggregations aggregations; private boolean hasAggs; - private List pipelineAggregators; + private List pipelineAggregators = Collections.emptyList(); private Suggest suggest; private boolean searchTimedOut; private Boolean terminatedEarly = null; @@ -80,7 +81,6 @@ public QuerySearchResult queryResult() { return this; } - public void searchTimedOut(boolean searchTimedOut) { this.searchTimedOut = searchTimedOut; } @@ -204,7 +204,7 @@ public List pipelineAggregators() { } public void pipelineAggregators(List pipelineAggregators) { - this.pipelineAggregators = pipelineAggregators; + this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators); } public Suggest suggest() { @@ -338,7 +338,7 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); aggregations.writeTo(out); } - out.writeNamedWriteableList(pipelineAggregators == null ? emptyList() : pipelineAggregators); + out.writeNamedWriteableList(pipelineAggregators); if (suggest == null) { out.writeBoolean(false); } else { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index c32ff7b88f8b9..d3aa59d7b7e8c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -504,6 +504,72 @@ public void testMergeNoResponsesAdded() { assertEquals(0, response.getShardFailures().length); } + public void testMergeEmptySearchHitsWithNonEmpty() { + long currentRelativeTime = randomLong(); + final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, flag -> null); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + int numFields = randomIntBetween(1, 3); + SortField[] sortFields = new SortField[numFields]; + for (int i = 0; i < numFields; i++) { + sortFields[i] = new SortField("field-" + i, SortField.Type.INT, randomBoolean()); + } + PriorityQueue priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields)); + SearchHit[] hits = randomSearchHitArray(10, 1, "remote", new Index[]{new Index("index", "uuid")}, Float.NaN, 1, + sortFields, priorityQueue); + { + SearchHits searchHits = new SearchHits(hits, new TotalHits(10, TotalHits.Relation.EQUAL_TO), Float.NaN, sortFields, null, null); + InternalSearchResponse response = new InternalSearchResponse(searchHits, null, null, null, false, false, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 1L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + merger.add(searchResponse); + } + { + SearchHits empty = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN, null, null, null); + InternalSearchResponse response = new InternalSearchResponse(empty, null, null, null, false, false, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 1L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + merger.add(searchResponse); + } + assertEquals(2, merger.numResponses()); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertEquals(10, mergedResponse.getHits().getTotalHits().value); + assertEquals(10, mergedResponse.getHits().getHits().length); + assertEquals(2, mergedResponse.getTotalShards()); + assertEquals(2, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertArrayEquals(sortFields, mergedResponse.getHits().getSortFields()); + assertArrayEquals(hits, mergedResponse.getHits().getHits()); + assertEquals(clusters, mergedResponse.getClusters()); + } + + public void testMergeOnlyEmptyHits() { + long currentRelativeTime = randomLong(); + final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + Tuple randomTrackTotalHits = randomTrackTotalHits(); + int trackTotalHitsUpTo = randomTrackTotalHits.v1(); + TotalHits.Relation totalHitsRelation = randomTrackTotalHits.v2(); + SearchResponseMerger merger = new SearchResponseMerger(0, 10, trackTotalHitsUpTo, timeProvider, flag -> null); + int numResponses = randomIntBetween(1, 5); + TotalHits expectedTotalHits = null; + for (int i = 0; i < numResponses; i++) { + TotalHits totalHits = null; + if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) { + totalHits = new TotalHits(randomLongBetween(0, 1000), totalHitsRelation); + long previousValue = expectedTotalHits == null ? 0 : expectedTotalHits.value; + expectedTotalHits = new TotalHits(Math.min(previousValue + totalHits.value, trackTotalHitsUpTo), totalHitsRelation); + } + SearchHits empty = new SearchHits(new SearchHit[0], totalHits, Float.NaN, null, null, null); + InternalSearchResponse response = new InternalSearchResponse(empty, null, null, null, false, false, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 1L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + merger.add(searchResponse); + } + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertEquals(expectedTotalHits, mergedResponse.getHits().getTotalHits()); + } + private static Tuple randomTrackTotalHits() { switch(randomIntBetween(0, 2)) { case 0: diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 9a9524d0ff57e..2165895974e27 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -412,7 +412,8 @@ public void testCCSRemoteReduce() throws Exception { OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null; int totalClusters = numClusters + (local ? 1 : 0); TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); - Function reduceContext = finalReduce -> null; + Function reduceContext = + finalReduce -> new InternalAggregation.ReduceContext(null, null, finalReduce); try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); @@ -805,11 +806,17 @@ public void testShouldMinimizeRoundtrips() throws Exception { collapseBuilder.setInnerHits(new InnerHitBuilder("inner")); assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } { SearchRequestTests searchRequestTests = new SearchRequestTests(); searchRequestTests.setUp(); SearchRequest searchRequest = searchRequestTests.createSearchRequest(); searchRequest.scroll((Scroll)null); + searchRequest.searchType(SearchType.QUERY_THEN_FETCH); SearchSourceBuilder source = searchRequest.source(); if (source != null) { CollapseBuilder collapse = source.collapse(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java new file mode 100644 index 0000000000000..3212c18cf278f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests; +import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; +import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class InternalAggregationsTests extends ESTestCase { + + private final NamedWriteableRegistry registry = new NamedWriteableRegistry( + new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables()); + + public void testReduceEmptyAggs() { + List aggs = Collections.emptyList(); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); + assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext)); + } + + public void testNonFinalReduceTopLevelPipelineAggs() throws IOException { + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); + List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms))); + List topLevelPipelineAggs = new ArrayList<>(); + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext); + assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); + assertEquals(1, reducedAggs.aggregations.size()); + } + + public void testFinalReduceTopLevelPipelineAggs() throws IOException { + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); + + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + SiblingPipelineAggregator siblingPipelineAggregator = (SiblingPipelineAggregator) maxBucketPipelineAggregationBuilder.create(); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, true); + final InternalAggregations reducedAggs; + if (randomBoolean()) { + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), + Collections.singletonList(siblingPipelineAggregator)); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + } else { + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); + List topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); + } + assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); + assertEquals(2, reducedAggs.aggregations.size()); + } + + public void testSerialization() throws Exception { + List aggsList = new ArrayList<>(); + if (randomBoolean()) { + StringTermsTests stringTermsTests = new StringTermsTests(); + stringTermsTests.init(); + stringTermsTests.setUp(); + aggsList.add(stringTermsTests.createTestInstance()); + } + if (randomBoolean()) { + InternalDateHistogramTests dateHistogramTests = new InternalDateHistogramTests(); + dateHistogramTests.setUp(); + aggsList.add(dateHistogramTests.createTestInstance()); + } + if (randomBoolean()) { + InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); + aggsList.add(simpleValueTests.createTestInstance()); + } + List topLevelPipelineAggs = new ArrayList<>(); + if (randomBoolean()) { + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); + } + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); + } + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); + } + } + InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); + writeToAndReadFrom(aggregations, 0); + } + + private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { + Version version = VersionUtils.randomVersion(random()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + aggregations.writeTo(out); + try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) { + in.setVersion(version); + InternalAggregations deserialized = InternalAggregations.readAggregations(in); + assertEquals(aggregations.aggregations, deserialized.aggregations); + if (aggregations.getTopLevelPipelineAggregators() == null) { + assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); + } else { + if (version.before(Version.V_6_7_0)) { + assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); + } else { + assertEquals(aggregations.getTopLevelPipelineAggregators().size(), + deserialized.getTopLevelPipelineAggregators().size()); + for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { + SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i); + SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i); + assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths()); + assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name()); + } + } + } + if (iteration < 2) { + //serialize this enough times to make sure that we are able to write again what we read + writeToAndReadFrom(deserialized, iteration + 1); + } + } + } + } +}