Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -62,6 +64,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final long clusterStateVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final Map<String, Set<String>> indexRoutings;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
Expand All @@ -72,6 +75,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
Expand All @@ -89,6 +93,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
this.clusterStateVersion = clusterStateVersion;
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
}
Expand Down Expand Up @@ -128,17 +133,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
Expand Down Expand Up @@ -271,14 +276,14 @@ public final SearchRequest getRequest() {

@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {

ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}

return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
Expand Down Expand Up @@ -318,8 +323,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -47,6 +48,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
Expand All @@ -56,9 +58,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
*/
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(),
clusters);
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public final void run() throws IOException {
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
Expand All @@ -140,7 +140,7 @@ public final void run() throws IOException {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

Expand All @@ -37,11 +38,13 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction

SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

Expand All @@ -37,13 +38,14 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se

SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
String[] concreteIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
Expand Down Expand Up @@ -350,7 +351,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
}
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, clusters).start();
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}

private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
Expand Down Expand Up @@ -380,17 +381,20 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
long clusterStateVersion,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener, boolean preFilter,
Map<String, Set<String>> indexRoutings,
ActionListener<SearchResponse> listener,
boolean preFilter,
SearchResponse.Clusters clusters) {
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
if (preFilter) {
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false, clusters);
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() throws IOException {
Expand All @@ -403,14 +407,14 @@ public void run() throws IOException {
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, clusters);
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
break;
case QUERY_AND_FETCH:
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, clusters);
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* A simple {@link ShardsIterator} that iterates a list or sub-list of
* {@link ShardRouting shard routings}.
* {@link ShardRouting shard indexRoutings}.
*/
public class PlainShardsIterator implements ShardsIterator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

/**
* {@link ShardRouting} immutably encapsulates information about shard
* routings like id, state, version, etc.
* indexRoutings like id, state, version, etc.
*/
public final class ShardRouting implements Writeable, ToXContentObject {

Expand Down Expand Up @@ -477,7 +477,7 @@ public boolean isRelocationTargetOf(ShardRouting other) {
"ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]";

assert b == false || this.shardId.equals(other.shardId) :
"ShardRouting is a relocation target but both routings are not of the same shard id. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but both indexRoutings are not of the same shard id. This [" + this + "], other [" + other + "]";

assert b == false || this.primary == other.primary :
"ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]";
Expand All @@ -504,7 +504,7 @@ public boolean isRelocationSourceOf(ShardRouting other) {
"ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]";

assert b == false || this.shardId.equals(other.shardId) :
"ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]";
"ShardRouting is a relocation source but both indexRoutings are not of the same shard. This [" + this + "], target [" + other + "]";

assert b == false || this.primary == other.primary :
"ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]";
Expand Down
Loading