diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index aad2638bd9de3..91aec1171dcd6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -37,8 +37,10 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.transport.Transport; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -62,6 +64,7 @@ abstract class AbstractSearchAsyncAction exten private final long clusterStateVersion; private final Map aliasFilter; private final Map concreteIndexBoosts; + private final Map> indexRoutings; private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); private final AtomicInteger successfulOps = new AtomicInteger(); @@ -72,6 +75,7 @@ abstract class AbstractSearchAsyncAction exten protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, + Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, @@ -89,6 +93,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS this.clusterStateVersion = clusterStateVersion; this.concreteIndexBoosts = concreteIndexBoosts; this.aliasFilter = aliasFilter; + this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; } @@ -128,17 +133,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha onPhaseFailure(currentPhase, "all shards failed", cause); } else { Boolean allowPartialResults = request.allowPartialSearchResults(); - assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; + assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; if (allowPartialResults == false && shardFailures.get() != null ){ if (logger.isDebugEnabled()) { final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures()); Throwable cause = shardSearchFailures.length == 0 ? null : ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0]; - logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]", + logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]", shardSearchFailures.length, getName()), cause); } - onPhaseFailure(currentPhase, "Partial shards failure", null); - } else { + onPhaseFailure(currentPhase, "Partial shards failure", null); + } else { if (logger.isTraceEnabled()) { final String resultsFrom = results.getSuccessfulResults() .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(",")); @@ -271,14 +276,14 @@ public final SearchRequest getRequest() { @Override public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) { - + ShardSearchFailure[] failures = buildShardFailures(); Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; if (allowPartialResults == false && failures.length > 0){ - raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures)); - } - + raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures)); + } + return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(), skippedOps.get(), buildTookInMillis(), failures, clusters); } @@ -318,8 +323,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID()); assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST); + String indexName = shardIt.shardId().getIndex().getName(); + final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) + .toArray(new String[0]); return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), - filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias); + filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings); } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index fe42d50393635..0873ff40f7500 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -27,6 +27,7 @@ import org.elasticsearch.transport.Transport; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Function; @@ -47,6 +48,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, + Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, @@ -56,9 +58,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction 0) { int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size()); final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); - assert success; + assert success; assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; if (request.allowPartialSearchResults() == false) { final StringBuilder missingShards = new StringBuilder(); @@ -140,7 +140,7 @@ public final void run() throws IOException { final SearchShardIterator shardRoutings = shardsIts.get(index); if (shardRoutings.size() == 0) { if(missingShards.length() >0 ){ - missingShards.append(", "); + missingShards.append(", "); } missingShards.append(shardRoutings.shardId()); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 9bcbe1c8e6760..0782fbb310b65 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.transport.Transport; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -37,11 +38,13 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, final BiFunction nodeIdToConnection, final Map aliasFilter, - final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, + final Map concreteIndexBoosts, final Map> indexRoutings, + final SearchPhaseController searchPhaseController, final Executor executor, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) { - super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, + super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters); this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index b7669312b0088..bbd84011de00b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.transport.Transport; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -37,13 +38,14 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToConnection, final Map aliasFilter, - final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, + final Map concreteIndexBoosts, final Map> indexRoutings, + final SearchPhaseController searchPhaseController, final Executor executor, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) { - super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, - shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), - request.getMaxConcurrentShardRequests(), clusters); + super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task, + searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters); this.searchPhaseController = searchPhaseController; } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index bd533ce7b097a..6b39af478f432 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -297,6 +297,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); + routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); String[] concreteIndices = new String[indices.length]; for (int i = 0; i < indices.length; i++) { concreteIndices[i] = indices[i].getName(); @@ -350,7 +351,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea } boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators); searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), - Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, clusters).start(); + Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); } private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator shardIterators) { @@ -380,17 +381,20 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque GroupShardsIterator shardIterators, SearchTimeProvider timeProvider, BiFunction connectionLookup, - long clusterStateVersion, Map aliasFilter, + long clusterStateVersion, + Map aliasFilter, Map concreteIndexBoosts, - ActionListener listener, boolean preFilter, + Map> indexRoutings, + ActionListener listener, + boolean preFilter, SearchResponse.Clusters clusters) { Executor executor = threadPool.executor(ThreadPool.Names.SEARCH); if (preFilter) { return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators, + aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task, (iter) -> { AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup, - clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false, clusters); + clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters); return new SearchPhase(action.getName()) { @Override public void run() throws IOException { @@ -403,14 +407,14 @@ public void run() throws IOException { switch (searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, - timeProvider, clusterStateVersion, task, clusters); + aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener, + shardIterators, timeProvider, clusterStateVersion, task, clusters); break; case QUERY_AND_FETCH: case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, - timeProvider, clusterStateVersion, task, clusters); + aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener, + shardIterators, timeProvider, clusterStateVersion, task, clusters); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index 6cb1989a8dd02..e9a99b7b456c4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -24,7 +24,7 @@ /** * A simple {@link ShardsIterator} that iterates a list or sub-list of - * {@link ShardRouting shard routings}. + * {@link ShardRouting shard indexRoutings}. */ public class PlainShardsIterator implements ShardsIterator { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index be1213ad134f1..6a9a105b6c432 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -38,7 +38,7 @@ /** * {@link ShardRouting} immutably encapsulates information about shard - * routings like id, state, version, etc. + * indexRoutings like id, state, version, etc. */ public final class ShardRouting implements Writeable, ToXContentObject { @@ -477,7 +477,7 @@ public boolean isRelocationTargetOf(ShardRouting other) { "ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]"; assert b == false || this.shardId.equals(other.shardId) : - "ShardRouting is a relocation target but both routings are not of the same shard id. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation target but both indexRoutings are not of the same shard id. This [" + this + "], other [" + other + "]"; assert b == false || this.primary == other.primary : "ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]"; @@ -504,7 +504,7 @@ public boolean isRelocationSourceOf(ShardRouting other) { "ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]"; assert b == false || this.shardId.equals(other.shardId) : - "ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]"; + "ShardRouting is a relocation source but both indexRoutings are not of the same shard. This [" + this + "], target [" + other + "]"; assert b == false || this.primary == other.primary : "ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]"; diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 1356a1458a2ed..d681a186892db 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -25,8 +25,10 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; +import org.elasticsearch.Version; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.search.Queries; @@ -91,6 +93,7 @@ final class DefaultSearchContext extends SearchContext { private final Engine.Searcher engineSearcher; private final BigArrays bigArrays; private final IndexShard indexShard; + private final ClusterService clusterService; private final IndexService indexService; private final ContextIndexSearcher searcher; private final DfsSearchResult dfsResult; @@ -120,6 +123,7 @@ final class DefaultSearchContext extends SearchContext { // filter for sliced scroll private SliceBuilder sliceBuilder; private SearchTask task; + private final Version minNodeVersion; /** @@ -152,9 +156,10 @@ final class DefaultSearchContext extends SearchContext { private final QueryShardContext queryShardContext; private FetchPhase fetchPhase; - DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, - IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, - TimeValue timeout, FetchPhase fetchPhase, String clusterAlias) { + DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, + Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, + IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout, + FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) { this.id = id; this.request = request; this.fetchPhase = fetchPhase; @@ -168,9 +173,11 @@ final class DefaultSearchContext extends SearchContext { this.fetchResult = new FetchSearchResult(id, shardTarget); this.indexShard = indexShard; this.indexService = indexService; + this.clusterService = clusterService; this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy()); this.timeEstimateCounter = timeEstimateCounter; this.timeout = timeout; + this.minNodeVersion = minNodeVersion; queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis, clusterAlias); queryShardContext.setTypes(request.types()); @@ -278,8 +285,7 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query) } if (sliceBuilder != null) { - filters.add(sliceBuilder.toFilter(queryShardContext, shardTarget().getShardId().getId(), - queryShardContext.getIndexSettings().getNumberOfShards())); + filters.add(sliceBuilder.toFilter(clusterService, request, queryShardContext, minNodeVersion)); } if (filters.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index a742a3a06ae13..ed7f98c3b0b12 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -616,8 +616,8 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, - engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase, - request.getClusterAlias()); + engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, + fetchPhase, request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion()); boolean success = false; try { // we clone the query shard context here just for rewriting otherwise we diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java index af52924a2de2c..52892d4d52eb9 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java @@ -28,13 +28,10 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; -import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.Scroll; -import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -61,7 +58,6 @@ */ public class ShardSearchLocalRequest implements ShardSearchRequest { - private String clusterAlias; private ShardId shardId; private int numberOfShards; @@ -74,17 +70,18 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { private Boolean requestCache; private long nowInMillis; private boolean allowPartialSearchResults; - + private String[] indexRoutings = Strings.EMPTY_ARRAY; + private String preference; private boolean profile; ShardSearchLocalRequest() { } ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards, - AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) { + AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias, String[] indexRoutings) { this(shardId, numberOfShards, searchRequest.searchType(), - searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost, - searchRequest.allowPartialSearchResults()); + searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost, + searchRequest.allowPartialSearchResults(), indexRoutings, searchRequest.preference()); // If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted // at this stage. Any NPEs in the above are therefore an error in request preparation logic. assert searchRequest.allowPartialSearchResults() != null; @@ -102,7 +99,8 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis } public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types, - Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) { + Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults, + String[] indexRoutings, String preference) { this.shardId = shardId; this.numberOfShards = numberOfShards; this.searchType = searchType; @@ -112,6 +110,8 @@ public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType s this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; this.allowPartialSearchResults = allowPartialSearchResults; + this.indexRoutings = indexRoutings; + this.preference = preference; } @@ -169,18 +169,28 @@ public long nowInMillis() { public Boolean requestCache() { return requestCache; } - + @Override public Boolean allowPartialSearchResults() { return allowPartialSearchResults; } - + @Override public Scroll scroll() { return scroll; } + @Override + public String[] indexRoutings() { + return indexRoutings; + } + + @Override + public String preference() { + return preference; + } + @Override public void setProfile(boolean profile) { this.profile = profile; @@ -225,6 +235,13 @@ protected void innerReadFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_6_3_0)) { allowPartialSearchResults = in.readOptionalBoolean(); } + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + indexRoutings = in.readStringArray(); + preference = in.readOptionalString(); + } else { + indexRoutings = Strings.EMPTY_ARRAY; + preference = null; + } } protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { @@ -240,7 +257,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException if (out.getVersion().onOrAfter(Version.V_5_2_0)) { out.writeFloat(indexBoost); } - if (!asKey) { + if (asKey == false) { out.writeVLong(nowInMillis); } out.writeOptionalBoolean(requestCache); @@ -250,7 +267,12 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeOptionalBoolean(allowPartialSearchResults); } - + if (asKey == false) { + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeStringArray(indexRoutings); + out.writeOptionalString(preference); + } + } } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 19eb0f17ccc84..0a1513e17d08e 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -19,7 +19,9 @@ package org.elasticsearch.search.internal; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedFunction; @@ -28,8 +30,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryRewriteContext; -import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.AliasFilterParsingException; @@ -68,11 +68,21 @@ public interface ShardSearchRequest { long nowInMillis(); Boolean requestCache(); - + Boolean allowPartialSearchResults(); Scroll scroll(); + /** + * Returns the routing values resolved by the coordinating node for the index pointed by {@link #shardId()}. + */ + String[] indexRoutings(); + + /** + * Returns the preference of the original {@link SearchRequest#preference()}. + */ + String preference(); + /** * Sets if this shard search needs to be profiled or not * @param profile True if the shard should be profiled diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index ac86d24ed000d..08060a2b249b6 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -28,9 +28,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryRewriteContext; -import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.Scroll; @@ -57,9 +54,10 @@ public ShardSearchTransportRequest(){ } public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards, - AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) { + AliasFilter aliasFilter, float indexBoost, long nowInMillis, + String clusterAlias, String[] indexRoutings) { this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost, - nowInMillis, clusterAlias); + nowInMillis, clusterAlias, indexRoutings); this.originalIndices = originalIndices; } @@ -151,17 +149,27 @@ public long nowInMillis() { public Boolean requestCache() { return shardSearchLocalRequest.requestCache(); } - + @Override public Boolean allowPartialSearchResults() { return shardSearchLocalRequest.allowPartialSearchResults(); - } + } @Override public Scroll scroll() { return shardSearchLocalRequest.scroll(); } + @Override + public String[] indexRoutings() { + return shardSearchLocalRequest.indexRoutings(); + } + + @Override + public String preference() { + return shardSearchLocalRequest.preference(); + } + @Override public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); diff --git a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java index aabf0c3fd0c69..06eba08fd7f22 100644 --- a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -23,6 +23,10 @@ import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -30,6 +34,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -39,9 +44,13 @@ import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; +import java.util.Set; /** * A slice builder allowing to split a scroll in multiple partitions. @@ -203,12 +212,49 @@ public int hashCode() { return Objects.hash(this.field, this.id, this.max); } - public Query toFilter(QueryShardContext context, int shardId, int numShards) { + /** + * Converts this QueryBuilder to a lucene {@link Query}. + * + * @param context Additional information needed to build the query + */ + public Query toFilter(ClusterService clusterService, ShardSearchRequest request, QueryShardContext context, Version minNodeVersion) { final MappedFieldType type = context.fieldMapper(field); if (type == null) { throw new IllegalArgumentException("field " + field + " not found"); } + int shardId = request.shardId().id(); + int numShards = context.getIndexSettings().getNumberOfShards(); + if (minNodeVersion.onOrAfter(Version.V_7_0_0_alpha1) && + (request.preference() != null || request.indexRoutings().length > 0)) { + GroupShardsIterator group = buildShardIterator(clusterService, request); + assert group.size() <= numShards : "index routing shards: " + group.size() + + " cannot be greater than total number of shards: " + numShards; + if (group.size() < numShards) { + /** + * The routing of this request targets a subset of the shards of this index so we need to we retrieve + * the original {@link GroupShardsIterator} and compute the request shard id and number of + * shards from it. + * This behavior has been added in {@link Version#V_7_0_0_alpha1} so if there is another node in the cluster + * with an older version we use the original shard id and number of shards in order to ensure that all + * slices use the same numbers. + */ + numShards = group.size(); + int ord = 0; + shardId = -1; + // remap the original shard id with its index (position) in the sorted shard iterator. + for (ShardIterator it : group) { + assert it.shardId().getIndex().equals(request.shardId().getIndex()); + if (request.shardId().equals(it.shardId())) { + shardId = ord; + break; + } + ++ord; + } + assert shardId != -1 : "shard id: " + request.shardId().getId() + " not found in index shard routing"; + } + } + String field = this.field; boolean useTermQuery = false; if ("_uid".equals(field)) { @@ -273,6 +319,17 @@ public Query toFilter(QueryShardContext context, int shardId, int numShards) { return new MatchAllDocsQuery(); } + /** + * Returns the {@link GroupShardsIterator} for the provided request. + */ + private GroupShardsIterator buildShardIterator(ClusterService clusterService, ShardSearchRequest request) { + final ClusterState state = clusterService.state(); + String[] indices = new String[] { request.shardId().getIndex().getName() }; + Map> routingMap = request.indexRoutings().length > 0 ? + Collections.singletonMap(indices[0], Sets.newHashSet(request.indexRoutings())) : null; + return clusterService.operationRouting().searchShards(state, indices, routingMap, request.preference()); + } + @Override public String toString() { return Strings.toString(this, true, true); diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 6ade2b8781ecf..193878e2f5e04 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -62,10 +63,15 @@ private AbstractSearchAsyncAction createAction( final SearchRequest request = new SearchRequest(); request.allowPartialSearchResults(true); + request.preference("_shards:1,3"); return new AbstractSearchAsyncAction("test", null, null, null, - Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, - request, null, new GroupShardsIterator<>(Collections.singletonList( - new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, + Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), + Collections.singletonMap("name", Sets.newHashSet("bar", "baz")),null, request, null, + new GroupShardsIterator<>( + Collections.singletonList( + new SearchShardIterator(null, null, Collections.emptyList(), null) + ) + ), timeProvider, 0, null, new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY) { @Override @@ -117,5 +123,8 @@ public void testBuildShardSearchTransportRequest() { assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.getAliasFilter().getQueryBuilder()); assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f); + assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); + assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings()); + assertEquals("_shards:1,3", shardSearchTransportRequest.preference()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index d60f29a5d5395..8b1741967734c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -78,12 +78,12 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRe 2, randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); - + CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, 0, null, (iter) -> new SearchPhase("test") { @Override @@ -159,12 +159,12 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRe final SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); - + CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, 0, null, (iter) -> new SearchPhase("test") { @Override @@ -222,6 +222,7 @@ public void sendCanMatch( (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), Collections.emptyMap(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index c731d1aaabed0..82e0fcaf5d667 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -106,6 +106,7 @@ public void onFailure(Exception e) { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), + Collections.emptyMap(), null, request, responseListener, @@ -198,6 +199,7 @@ public void onFailure(Exception e) { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), + Collections.emptyMap(), null, request, responseListener, @@ -303,6 +305,7 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Ori return lookup.get(node); }, aliasFilters, Collections.emptyMap(), + Collections.emptyMap(), executor, request, responseListener, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java index 65526896864d6..8a9b00a8d4ff7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java @@ -83,7 +83,7 @@ public void setUp() throws Exception { } /** - * puts primary shard routings into initializing state + * puts primary shard indexRoutings into initializing state */ private void initPrimaries() { logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java index 055adbaebbce5..349997d7793eb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java @@ -83,7 +83,7 @@ public void setUp() throws Exception { } /** - * puts primary shard routings into initializing state + * puts primary shard indexRoutings into initializing state */ private void initPrimaries() { logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1); diff --git a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java index 6e8e679188c66..4ef9c36d9306c 100644 --- a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java @@ -122,6 +122,16 @@ public Scroll scroll() { return null; } + @Override + public String[] indexRoutings() { + return null; + } + + @Override + public String preference() { + return null; + } + @Override public void setProfile(boolean profile) { diff --git a/server/src/test/java/org/elasticsearch/routing/AliasRoutingIT.java b/server/src/test/java/org/elasticsearch/routing/AliasRoutingIT.java index 14ec800c3a65d..6b3b6a67a9783 100644 --- a/server/src/test/java/org/elasticsearch/routing/AliasRoutingIT.java +++ b/server/src/test/java/org/elasticsearch/routing/AliasRoutingIT.java @@ -170,7 +170,7 @@ public void testAliasSearchRouting() throws Exception { assertThat(client().prepareSearch("alias1").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(1L)); } - logger.info("--> search with 0,1 routings , should find two"); + logger.info("--> search with 0,1 indexRoutings , should find two"); for (int i = 0; i < 5; i++) { assertThat(client().prepareSearch().setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L)); assertThat(client().prepareSearch().setSize(0).setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L)); diff --git a/server/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java b/server/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java index 84caed948a2be..0a2a43f2f83d4 100644 --- a/server/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java +++ b/server/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java @@ -173,13 +173,13 @@ public void testSimpleSearchRouting() { assertThat(client().prepareSearch().setSize(0).setRouting(secondRoutingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(1L)); } - logger.info("--> search with {},{} routings , should find two", routingValue, "1"); + logger.info("--> search with {},{} indexRoutings , should find two", routingValue, "1"); for (int i = 0; i < 5; i++) { assertThat(client().prepareSearch().setRouting(routingValue, secondRoutingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L)); assertThat(client().prepareSearch().setSize(0).setRouting(routingValue, secondRoutingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L)); } - logger.info("--> search with {},{},{} routings , should find two", routingValue, secondRoutingValue, routingValue); + logger.info("--> search with {},{},{} indexRoutings , should find two", routingValue, secondRoutingValue, routingValue); for (int i = 0; i < 5; i++) { assertThat(client().prepareSearch().setRouting(routingValue, secondRoutingValue, routingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L)); assertThat(client().prepareSearch().setSize(0).setRouting(routingValue, secondRoutingValue,routingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L)); diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index ec422435e4e07..f59cc85c09ccf 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -112,8 +112,8 @@ public void testPreProcess() throws Exception { IndexReader reader = w.getReader(); Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader))) { - DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, indexService, - indexShard, bigArrays, null, timeout, null, null); + DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService, + indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); context1.from(300); // resultWindow greater than maxResultWindow and scrollContext is null @@ -153,8 +153,8 @@ public void testPreProcess() throws Exception { + "] index level setting.")); // rescore is null but sliceBuilder is not null - DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, null, searcher, indexService, - indexShard, bigArrays, null, timeout, null, null); + DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, null, searcher, + null, indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); SliceBuilder sliceBuilder = mock(SliceBuilder.class); int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100); @@ -170,8 +170,8 @@ public void testPreProcess() throws Exception { when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY); when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST); - DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, null, searcher, indexService, - indexShard, bigArrays, null, timeout, null, null); + DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, null, searcher, null, + indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery(); context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false); assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query())); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index f5552ee0d2e46..c58a158fc677d 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -213,7 +213,7 @@ public void onFailure(Exception e) { SearchPhaseResult searchPhaseResult = service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, - true), + true, null, null), new SearchTask(123L, "", "", "", null, Collections.emptyMap())); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); @@ -249,7 +249,7 @@ public void testTimeout() throws IOException { new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, true) + 1.0f, true, null, null) ); try { // the search context should inherit the default timeout @@ -269,7 +269,7 @@ public void testTimeout() throws IOException { new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, true) + 1.0f, true, null, null) ); try { // the search context should inherit the query timeout @@ -297,12 +297,13 @@ public void testMaxDocvalueFieldsSearch() throws IOException { searchSourceBuilder.docValueField("field" + i); } try (SearchContext context = service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))) { + searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true, null, null))) { assertNotNull(context); searchSourceBuilder.docValueField("one_field_too_much"); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))); + searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, + true, null, null))); assertEquals( "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", @@ -328,13 +329,14 @@ public void testMaxScriptFieldsSearch() throws IOException { new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } try (SearchContext context = service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))) { + searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true, null, null))) { assertNotNull(context); searchSourceBuilder.scriptField("anotherScriptField", new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))); + searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, true, null, null))); assertEquals( "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" + (maxScriptFields + 1) @@ -406,28 +408,28 @@ public void testCanMatch() throws IOException { final IndexShard indexShard = indexService.getShard(0); final boolean allowPartialSearchResults = true; assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, null, - Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults))); + Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null))); assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, - new SearchSourceBuilder(), Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, - allowPartialSearchResults))); + new SearchSourceBuilder(), Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, + allowPartialSearchResults, null, null))); assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, new SearchSourceBuilder().query(new MatchAllQueryBuilder()), Strings.EMPTY_ARRAY, false, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults))); + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null))); assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, new SearchSourceBuilder().query(new MatchNoneQueryBuilder()) .aggregation(new TermsAggregationBuilder("test", ValueType.STRING).minDocCount(0)), Strings.EMPTY_ARRAY, false, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults))); + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null))); assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, new SearchSourceBuilder().query(new MatchNoneQueryBuilder()) .aggregation(new GlobalAggregationBuilder("test")), Strings.EMPTY_ARRAY, false, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults))); + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null))); assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults))); + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null))); } diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java index c2016ceb02ce7..21a4f099f5a32 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java @@ -74,6 +74,8 @@ public void testSerialization() throws Exception { assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType()); assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId()); assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards()); + assertEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings()); + assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference()); assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey()); assertNotSame(deserializedRequest, shardSearchTransportRequest); assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter()); @@ -92,8 +94,10 @@ private ShardSearchTransportRequest createShardSearchTransportRequest() throws I } else { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } + final String[] routings = generateRandomStringArray(5, 10, false, true); return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId, - randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), null); + randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), + Math.abs(randomLong()), null, routings); } public void testFilteringAliases() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java b/server/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java index 2227cbb806b3f..d609f84e4192e 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.slice; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -48,9 +49,7 @@ import static org.hamcrest.Matchers.startsWith; public class SearchSliceIT extends ESIntegTestCase { - private static final int NUM_DOCS = 1000; - - private int setupIndex(boolean withDocs) throws IOException, ExecutionException, InterruptedException { + private void setupIndex(int numDocs, int numberOfShards) throws IOException, ExecutionException, InterruptedException { String mapping = Strings.toString(XContentFactory.jsonBuilder(). startObject() .startObject("type") @@ -70,74 +69,112 @@ private int setupIndex(boolean withDocs) throws IOException, ExecutionException, .endObject() .endObject() .endObject()); - int numberOfShards = randomIntBetween(1, 7); assertAcked(client().admin().indices().prepareCreate("test") .setSettings(Settings.builder().put("number_of_shards", numberOfShards).put("index.max_slices_per_scroll", 10000)) .addMapping("type", mapping, XContentType.JSON)); ensureGreen(); - if (withDocs == false) { - return numberOfShards; - } - List requests = new ArrayList<>(); - for (int i = 0; i < NUM_DOCS; i++) { - XContentBuilder builder = jsonBuilder(); - builder.startObject(); - builder.field("invalid_random_kw", randomAlphaOfLengthBetween(5, 20)); - builder.field("random_int", randomInt()); - builder.field("static_int", 0); - builder.field("invalid_random_int", randomInt()); - builder.endObject(); + for (int i = 0; i < numDocs; i++) { + XContentBuilder builder = jsonBuilder() + .startObject() + .field("invalid_random_kw", randomAlphaOfLengthBetween(5, 20)) + .field("random_int", randomInt()) + .field("static_int", 0) + .field("invalid_random_int", randomInt()) + .endObject(); requests.add(client().prepareIndex("test", "type").setSource(builder)); } indexRandom(true, requests); - return numberOfShards; } - public void testDocIdSort() throws Exception { - int numShards = setupIndex(true); - SearchResponse sr = client().prepareSearch("test") - .setQuery(matchAllQuery()) - .setSize(0) - .get(); - int numDocs = (int) sr.getHits().getTotalHits(); - assertThat(numDocs, equalTo(NUM_DOCS)); - int max = randomIntBetween(2, numShards*3); + public void testSearchSort() throws Exception { + int numShards = randomIntBetween(1, 7); + int numDocs = randomIntBetween(100, 1000); + setupIndex(numDocs, numShards); + int max = randomIntBetween(2, numShards * 3); for (String field : new String[]{"_id", "random_int", "static_int"}) { int fetchSize = randomIntBetween(10, 100); + // test _doc sort SearchRequestBuilder request = client().prepareSearch("test") .setQuery(matchAllQuery()) .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) .setSize(fetchSize) .addSort(SortBuilders.fieldSort("_doc")); - assertSearchSlicesWithScroll(request, field, max); + assertSearchSlicesWithScroll(request, field, max, numDocs); + + // test numeric sort + request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .addSort(SortBuilders.fieldSort("random_int")) + .setSize(fetchSize); + assertSearchSlicesWithScroll(request, field, max, numDocs); } } - public void testNumericSort() throws Exception { - int numShards = setupIndex(true); - SearchResponse sr = client().prepareSearch("test") - .setQuery(matchAllQuery()) - .setSize(0) - .get(); - int numDocs = (int) sr.getHits().getTotalHits(); - assertThat(numDocs, equalTo(NUM_DOCS)); - - int max = randomIntBetween(2, numShards*3); - for (String field : new String[]{"_id", "random_int", "static_int"}) { + public void testWithPreferenceAndRoutings() throws Exception { + int numShards = 10; + int totalDocs = randomIntBetween(100, 1000); + setupIndex(totalDocs, numShards); + { + SearchResponse sr = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setPreference("_shards:1,4") + .setSize(0) + .get(); + int numDocs = (int) sr.getHits().getTotalHits(); + int max = randomIntBetween(2, numShards * 3); int fetchSize = randomIntBetween(10, 100); SearchRequestBuilder request = client().prepareSearch("test") .setQuery(matchAllQuery()) .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) - .addSort(SortBuilders.fieldSort("random_int")) - .setSize(fetchSize); - assertSearchSlicesWithScroll(request, field, max); + .setSize(fetchSize) + .setPreference("_shards:1,4") + .addSort(SortBuilders.fieldSort("_doc")); + assertSearchSlicesWithScroll(request, "_id", max, numDocs); + } + { + SearchResponse sr = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setRouting("foo", "bar") + .setSize(0) + .get(); + int numDocs = (int) sr.getHits().getTotalHits(); + int max = randomIntBetween(2, numShards * 3); + int fetchSize = randomIntBetween(10, 100); + SearchRequestBuilder request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .setSize(fetchSize) + .setRouting("foo", "bar") + .addSort(SortBuilders.fieldSort("_doc")); + assertSearchSlicesWithScroll(request, "_id", max, numDocs); + } + { + assertAcked(client().admin().indices().prepareAliases() + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("test").alias("alias1").routing("foo")) + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("test").alias("alias2").routing("bar")) + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("test").alias("alias3").routing("baz")) + .get()); + SearchResponse sr = client().prepareSearch("alias1", "alias3") + .setQuery(matchAllQuery()) + .setSize(0) + .get(); + int numDocs = (int) sr.getHits().getTotalHits(); + int max = randomIntBetween(2, numShards * 3); + int fetchSize = randomIntBetween(10, 100); + SearchRequestBuilder request = client().prepareSearch("alias1", "alias3") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .setSize(fetchSize) + .addSort(SortBuilders.fieldSort("_doc")); + assertSearchSlicesWithScroll(request, "_id", max, numDocs); } } public void testInvalidFields() throws Exception { - setupIndex(false); + setupIndex(0, 1); SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch("test") .setQuery(matchAllQuery()) @@ -161,7 +198,7 @@ public void testInvalidFields() throws Exception { } public void testInvalidQuery() throws Exception { - setupIndex(false); + setupIndex(0, 1); SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch() .setQuery(matchAllQuery()) @@ -173,7 +210,7 @@ public void testInvalidQuery() throws Exception { equalTo("`slice` cannot be used outside of a scroll context")); } - private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice) { + private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) { int totalResults = 0; List keys = new ArrayList<>(); for (int id = 0; id < numSlice; id++) { @@ -184,7 +221,7 @@ private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String f int numSliceResults = searchResponse.getHits().getHits().length; String scrollId = searchResponse.getScrollId(); for (SearchHit hit : searchResponse.getHits().getHits()) { - keys.add(hit.getId()); + assertTrue(keys.add(hit.getId())); } while (searchResponse.getHits().getHits().length > 0) { searchResponse = client().prepareSearchScroll("test") @@ -195,15 +232,15 @@ private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String f totalResults += searchResponse.getHits().getHits().length; numSliceResults += searchResponse.getHits().getHits().length; for (SearchHit hit : searchResponse.getHits().getHits()) { - keys.add(hit.getId()); + assertTrue(keys.add(hit.getId())); } } assertThat(numSliceResults, equalTo(expectedSliceResults)); clearScroll(scrollId); } - assertThat(totalResults, equalTo(NUM_DOCS)); - assertThat(keys.size(), equalTo(NUM_DOCS)); - assertThat(new HashSet(keys).size(), equalTo(NUM_DOCS)); + assertThat(totalResults, equalTo(numDocs)); + assertThat(keys.size(), equalTo(numDocs)); + assertThat(new HashSet(keys).size(), equalTo(numDocs)); } private Throwable findRootCause(Exception e) { diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index 75802e92ee176..d6bcbfa8e6b7d 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -30,19 +30,38 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; import org.elasticsearch.Version; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.search.SearchShardIterator; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -58,13 +77,138 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class SliceBuilderTests extends ESTestCase { private static final int MAX_SLICE = 20; - private static SliceBuilder randomSliceBuilder() throws IOException { + static class ShardSearchRequestTest implements IndicesRequest, ShardSearchRequest { + private final String[] indices; + private final int shardId; + private final String[] indexRoutings; + private final String preference; + + ShardSearchRequestTest(String index, int shardId, String[] indexRoutings, String preference) { + this.indices = new String[] { index }; + this.shardId = shardId; + this.indexRoutings = indexRoutings; + this.preference = preference; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return null; + } + + @Override + public ShardId shardId() { + return new ShardId(new Index(indices[0], indices[0]), shardId); + } + + @Override + public String[] types() { + return new String[0]; + } + + @Override + public SearchSourceBuilder source() { + return null; + } + + @Override + public AliasFilter getAliasFilter() { + return null; + } + + @Override + public void setAliasFilter(AliasFilter filter) { + + } + + @Override + public void source(SearchSourceBuilder source) { + + } + + @Override + public int numberOfShards() { + return 0; + } + + @Override + public SearchType searchType() { + return null; + } + + @Override + public float indexBoost() { + return 0; + } + + @Override + public long nowInMillis() { + return 0; + } + + @Override + public Boolean requestCache() { + return null; + } + + @Override + public Boolean allowPartialSearchResults() { + return null; + } + + @Override + public Scroll scroll() { + return null; + } + + @Override + public String[] indexRoutings() { + return indexRoutings; + } + + @Override + public String preference() { + return preference; + } + + @Override + public void setProfile(boolean profile) { + + } + + @Override + public boolean isProfile() { + return false; + } + + @Override + public BytesReference cacheKey() throws IOException { + return null; + } + + @Override + public String getClusterAlias() { + return null; + } + + @Override + public Rewriteable getRewriteable() { + return null; + } + } + + private static SliceBuilder randomSliceBuilder() { int max = randomIntBetween(2, MAX_SLICE); int id = randomIntBetween(1, max - 1); String field = randomAlphaOfLengthBetween(5, 20); @@ -75,7 +219,7 @@ private static SliceBuilder serializedCopy(SliceBuilder original) throws IOExcep return copyWriteable(original, new NamedWriteableRegistry(Collections.emptyList()), SliceBuilder::new); } - private static SliceBuilder mutate(SliceBuilder original) throws IOException { + private static SliceBuilder mutate(SliceBuilder original) { switch (randomIntBetween(0, 2)) { case 0: return new SliceBuilder(original.getField() + "_xyz", original.getId(), original.getMax()); case 1: return new SliceBuilder(original.getField(), original.getId() - 1, original.getMax()); @@ -84,6 +228,63 @@ private static SliceBuilder mutate(SliceBuilder original) throws IOException { } } + private IndexSettings createIndexSettings(Version indexVersionCreated, int numShards) { + Settings settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, indexVersionCreated) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + IndexMetaData indexState = IndexMetaData.builder("index").settings(settings).build(); + return new IndexSettings(indexState, Settings.EMPTY); + } + + private ShardSearchRequest createRequest(int shardId) { + return createRequest(shardId, Strings.EMPTY_ARRAY, null); + } + + private ShardSearchRequest createRequest(int shardId, String[] routings, String preference) { + return new ShardSearchRequestTest("index", shardId, routings, preference); + } + + private QueryShardContext createShardContext(Version indexVersionCreated, IndexReader reader, + String fieldName, DocValuesType dvType, int numShards, int shardId) { + MappedFieldType fieldType = new MappedFieldType() { + @Override + public MappedFieldType clone() { + return null; + } + + @Override + public String typeName() { + return null; + } + + @Override + public Query termQuery(Object value, @Nullable QueryShardContext context) { + return null; + } + + public Query existsQuery(QueryShardContext context) { + return null; + } + }; + fieldType.setName(fieldName); + QueryShardContext context = mock(QueryShardContext.class); + when(context.fieldMapper(fieldName)).thenReturn(fieldType); + when(context.getIndexReader()).thenReturn(reader); + when(context.getShardId()).thenReturn(shardId); + IndexSettings indexSettings = createIndexSettings(indexVersionCreated, numShards); + when(context.getIndexSettings()).thenReturn(indexSettings); + if (dvType != null) { + fieldType.setHasDocValues(true); + fieldType.setDocValuesType(dvType); + IndexNumericFieldData fd = mock(IndexNumericFieldData.class); + when(context.getForField(fieldType)).thenReturn(fd); + } + return context; + + } + public void testSerialization() throws Exception { SliceBuilder original = randomSliceBuilder(); SliceBuilder deserialized = serializedCopy(original); @@ -131,92 +332,41 @@ public void testInvalidArguments() throws Exception { assertEquals("max must be greater than id", e.getMessage()); } - public void testToFilter() throws IOException { + public void testToFilterSimple() throws IOException { Directory dir = new RAMDirectory(); try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { writer.commit(); } - QueryShardContext context = mock(QueryShardContext.class); try (IndexReader reader = DirectoryReader.open(dir)) { - MappedFieldType fieldType = new MappedFieldType() { - @Override - public MappedFieldType clone() { - return null; - } - - @Override - public String typeName() { - return null; - } - - @Override - public Query termQuery(Object value, @Nullable QueryShardContext context) { - return null; - } - - public Query existsQuery(QueryShardContext context) { - return null; - } - }; - fieldType.setName(IdFieldMapper.NAME); - fieldType.setHasDocValues(false); - when(context.fieldMapper(IdFieldMapper.NAME)).thenReturn(fieldType); - when(context.getIndexReader()).thenReturn(reader); - Settings settings = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build(); - IndexMetaData indexState = IndexMetaData.builder("index").settings(settings).build(); - IndexSettings indexSettings = new IndexSettings(indexState, Settings.EMPTY); - when(context.getIndexSettings()).thenReturn(indexSettings); + QueryShardContext context = + createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED_NUMERIC, 1,0); SliceBuilder builder = new SliceBuilder(5, 10); - Query query = builder.toFilter(context, 0, 1); + Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT); assertThat(query, instanceOf(TermsSliceQuery.class)); - assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); } } + } + public void testToFilterRandom() throws IOException { + Directory dir = new RAMDirectory(); + try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { + writer.commit(); + } try (IndexReader reader = DirectoryReader.open(dir)) { - MappedFieldType fieldType = new MappedFieldType() { - @Override - public MappedFieldType clone() { - return null; - } - - @Override - public String typeName() { - return null; - } - - @Override - public Query termQuery(Object value, @Nullable QueryShardContext context) { - return null; - } - - public Query existsQuery(QueryShardContext context) { - return null; - } - }; - fieldType.setName("field_doc_values"); - fieldType.setHasDocValues(true); - fieldType.setDocValuesType(DocValuesType.SORTED_NUMERIC); - when(context.fieldMapper("field_doc_values")).thenReturn(fieldType); - when(context.getIndexReader()).thenReturn(reader); - IndexNumericFieldData fd = mock(IndexNumericFieldData.class); - when(context.getForField(fieldType)).thenReturn(fd); - SliceBuilder builder = new SliceBuilder("field_doc_values", 5, 10); - Query query = builder.toFilter(context, 0, 1); + QueryShardContext context = + createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC, 1,0); + SliceBuilder builder = new SliceBuilder("field", 5, 10); + Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT); assertThat(query, instanceOf(DocValuesSliceQuery.class)); - - assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); } // numSlices > numShards @@ -226,7 +376,8 @@ public Query existsQuery(QueryShardContext context) { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - Query q = slice.toFilter(context, j, numShards); + context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); + Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT); if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) { AtomicInteger count = numSliceMap.get(j); if (count == null) { @@ -250,12 +401,13 @@ public Query existsQuery(QueryShardContext context) { // numShards > numSlices numShards = randomIntBetween(4, 100); - numSlices = randomIntBetween(2, numShards-1); + numSlices = randomIntBetween(2, numShards - 1); List targetShards = new ArrayList<>(); for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - Query q = slice.toFilter(context, j, numShards); + context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); + Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT); if (q instanceof MatchNoDocsQuery == false) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); targetShards.add(j); @@ -271,7 +423,7 @@ public Query existsQuery(QueryShardContext context) { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - Query q = slice.toFilter(context, j, numShards); + Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT); if (i == j) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); } else { @@ -280,85 +432,35 @@ public Query existsQuery(QueryShardContext context) { } } } + } + public void testInvalidField() throws IOException { + Directory dir = new RAMDirectory(); + try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { + writer.commit(); + } try (IndexReader reader = DirectoryReader.open(dir)) { - MappedFieldType fieldType = new MappedFieldType() { - @Override - public MappedFieldType clone() { - return null; - } - - @Override - public String typeName() { - return null; - } - - @Override - public Query termQuery(Object value, @Nullable QueryShardContext context) { - return null; - } - - public Query existsQuery(QueryShardContext context) { - return null; - } - }; - fieldType.setName("field_without_doc_values"); - when(context.fieldMapper("field_without_doc_values")).thenReturn(fieldType); - when(context.getIndexReader()).thenReturn(reader); - SliceBuilder builder = new SliceBuilder("field_without_doc_values", 5, 10); - IllegalArgumentException exc = - expectThrows(IllegalArgumentException.class, () -> builder.toFilter(context, 0, 1)); + QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null, 1,0); + SliceBuilder builder = new SliceBuilder("field", 5, 10); + IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, + () -> builder.toFilter(null, createRequest(0), context, Version.CURRENT)); assertThat(exc.getMessage(), containsString("cannot load numeric doc values")); } } - public void testToFilterDeprecationMessage() throws IOException { Directory dir = new RAMDirectory(); try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { writer.commit(); } - QueryShardContext context = mock(QueryShardContext.class); try (IndexReader reader = DirectoryReader.open(dir)) { - MappedFieldType fieldType = new MappedFieldType() { - @Override - public MappedFieldType clone() { - return null; - } - - @Override - public String typeName() { - return null; - } - - @Override - public Query termQuery(Object value, @Nullable QueryShardContext context) { - return null; - } - - public Query existsQuery(QueryShardContext context) { - return null; - } - }; - fieldType.setName("_uid"); - fieldType.setHasDocValues(false); - when(context.fieldMapper("_uid")).thenReturn(fieldType); - when(context.getIndexReader()).thenReturn(reader); - Settings settings = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_3_0) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build(); - IndexMetaData indexState = IndexMetaData.builder("index").settings(settings).build(); - IndexSettings indexSettings = new IndexSettings(indexState, Settings.EMPTY); - when(context.getIndexSettings()).thenReturn(indexSettings); + QueryShardContext context = createShardContext(Version.V_6_3_0, reader, "_uid", null, 1,0); SliceBuilder builder = new SliceBuilder("_uid", 5, 10); - Query query = builder.toFilter(context, 0, 1); + Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT); assertThat(query, instanceOf(TermsSliceQuery.class)); - assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); assertWarnings("Computing slices on the [_uid] field is deprecated for 6.x indices, use [_id] instead"); } - } public void testSerializationBackcompat() throws IOException { @@ -375,4 +477,35 @@ public void testSerializationBackcompat() throws IOException { SliceBuilder::new, Version.V_6_3_0); assertEquals(sliceBuilder, copy63); } + + public void testToFilterWithRouting() throws IOException { + Directory dir = new RAMDirectory(); + try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { + writer.commit(); + } + ClusterService clusterService = mock(ClusterService.class); + ClusterState state = mock(ClusterState.class); + when(state.metaData()).thenReturn(MetaData.EMPTY_META_DATA); + when(clusterService.state()).thenReturn(state); + OperationRouting routing = mock(OperationRouting.class); + GroupShardsIterator it = new GroupShardsIterator<>( + Collections.singletonList( + new SearchShardIterator(null, new ShardId("index", "index", 1), null, null) + ) + ); + when(routing.searchShards(any(), any(), any(), any())).thenReturn(it); + when(clusterService.operationRouting()).thenReturn(routing); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + try (IndexReader reader = DirectoryReader.open(dir)) { + QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED, 5, 0); + SliceBuilder builder = new SliceBuilder("field", 6, 10); + String[] routings = new String[] { "foo" }; + Query query = builder.toFilter(clusterService, createRequest(1, routings, null), context, Version.CURRENT); + assertEquals(new DocValuesSliceQuery("field", 6, 10), query); + query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context, Version.CURRENT); + assertEquals(new DocValuesSliceQuery("field", 6, 10), query); + query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context, Version.V_6_2_0); + assertEquals(new DocValuesSliceQuery("field", 1, 2), query); + } + } }