diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java index 3601947950335..2b3c8a7bbcc33 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java @@ -200,7 +200,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re } catch (QueryShardException|ParsingException e) { valid = false; error = e.getDetailedMessage(); - } catch (AssertionError|IOException e) { + } catch (AssertionError e) { valid = false; error = e.getMessage(); } finally { @@ -210,7 +210,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error); } - private String explain(SearchContext context, boolean rewritten) throws IOException { + private String explain(SearchContext context, boolean rewritten) { Query query = context.query(); if (rewritten && query instanceof MatchNoDocsQuery) { return context.parsedQuery().query().toString(); diff --git a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index 5ea178f595acf..c1695e13864cb 100644 --- a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -152,4 +152,11 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) { clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference() ); } + + @Override + protected String getExecutor(ExplainRequest request, ShardId shardId) { + IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 63d3d30e1e27f..69753bdd9795e 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -111,4 +111,11 @@ protected GetResponse shardOperation(GetRequest request, ShardId shardId) { protected GetResponse newResponse() { return new GetResponse(); } + + @Override + protected String getExecutor(GetRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index e0a6cd827863a..7a7c02ad476e7 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -102,4 +102,11 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha return response; } + + @Override + protected String getExecutor(MultiGetShardRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 0873ff40f7500..82c8f6b815ab4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; @@ -40,7 +41,7 @@ * which allows to fan out to more shards at the same time without running into rejections even if we are hitting a * large portion of the clusters indices. */ -final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction { +final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction { private final Function, SearchPhase> phaseFactory; private final GroupShardsIterator shardsIts; @@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener) { + SearchActionListener listener) { getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), buildShardSearchRequest(shardIt), getTask(), listener); } @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts)); @@ -100,7 +101,7 @@ private GroupShardsIterator getIterator(BitSetSearchPhaseRe } private static final class BitSetSearchPhaseResults extends InitialSearchPhase. - SearchPhaseResults { + SearchPhaseResults { private final FixedBitSet possibleMatches; private int numPossibleMatches; @@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase. } @Override - void consumeResult(SearchTransportService.CanMatchResponse result) { + void consumeResult(SearchService.CanMatchResponse result) { if (result.canMatch()) { consumeShardFailure(result.getShardIndex()); } @@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() { } @Override - Stream getSuccessfulResults() { + Stream getSuccessfulResults() { return Stream.empty(); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 9db297f4b9247..fd43a948ee5fb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -112,9 +112,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin } public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final - ActionListener listener) { + ActionListener listener) { transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task, - TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new)); + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)); } public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { @@ -349,83 +349,54 @@ public void onFailure(Exception e) { transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask) task, new HandledTransportAction.ChannelActionListener<>( + searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>( channel, QUERY_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new, + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { - QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, + request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new, + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME, + request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new, + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, + QUERY_FETCH_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new, + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new, (request, channel, task) -> { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, + FETCH_ID_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new, + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new, (request, channel, task) -> { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, + request)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); // this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, (request, channel, task) -> { - boolean canMatch = searchService.canMatch(request); - channel.sendResponse(new CanMatchResponse(canMatch)); + searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, - (Supplier) CanMatchResponse::new); - } - - public static final class CanMatchResponse extends SearchPhaseResult { - private boolean canMatch; - - public CanMatchResponse() { - } - - public CanMatchResponse(boolean canMatch) { - this.canMatch = canMatch; - } - - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - canMatch = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(canMatch); - } - - public boolean canMatch() { - return canMatch; - } + (Supplier) SearchService.CanMatchResponse::new); } diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 3045f6ea43aa1..22c4a70b0ea55 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -57,6 +58,7 @@ public abstract class TransportBroadcastAction { @Override public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception { - channel.sendResponse(shardOperation(request, task)); + asyncShardOperation(request, task, new ActionListener() { + @Override + public void onResponse(ShardResponse response) { + try { + channel.sendResponse(response); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); + } + } + }); } } + + protected void asyncShardOperation(ShardRequest request, Task task, ActionListener listener) { + transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(shardOperation(request, task)); + } + }); + } + + protected String getExecutor(ShardRequest request) { + return shardExecutor; + } + } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 7a83b0c455da4..436089ab3be73 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; @@ -49,7 +50,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.function.Supplier; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; @@ -66,8 +66,8 @@ public abstract class TransportSingleShardAction li protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { - threadPool.executor(this.executor).execute(new AbstractRunnable() { + threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -274,25 +274,7 @@ private class TransportHandler implements TransportRequestHandler { @Override public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception { // if we have a local operation, execute it on a thread since we don't spawn - execute(request, new ActionListener() { - @Override - public void onResponse(Response result) { - try { - channel.sendResponse(result); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("failed to send response for get", e1); - } - } - }); + execute(request, new HandledTransportAction.ChannelActionListener<>(channel, actionName, request)); } } @@ -303,25 +285,8 @@ public void messageReceived(final Request request, final TransportChannel channe if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } - asyncShardOperation(request, request.internalShardId, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (IOException e1) { - throw new UncheckedIOException(e1); - } - } - }); + asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel, + transportShardAction, request)); } } /** @@ -344,4 +309,8 @@ public String concreteIndex() { return concreteIndex; } } + + protected String getExecutor(Request request, ShardId shardId) { + return executor; + } } diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java index f1641fdd25c98..10fd954354ba8 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java @@ -96,4 +96,11 @@ protected MultiTermVectorsShardResponse shardOperation(MultiTermVectorsShardRequ return response; } + + @Override + protected String getExecutor(MultiTermVectorsShardRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index d2a6055bbe75a..e0babdb6c4359 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -113,4 +113,11 @@ protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId protected TermVectorsResponse newResponse() { return new TermVectorsResponse(); } + + @Override + protected String getExecutor(TermVectorsRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index ae8529af5b53e..c4e0257c95395 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -142,6 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_SEARCH_IDLE_AFTER, + IndexSettings.INDEX_SEARCH_THROTTLED, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 9801cc3e26bb1..3a55eb8961172 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -277,6 +277,12 @@ public final class IndexSettings { return s; }, Property.Dynamic, Property.IndexScope); + /** + * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently + */ + public static final Setting INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false, + Property.IndexScope, Property.PrivateIndex, Property.Dynamic); + private final Index index; private final Version version; private final Logger logger; @@ -319,6 +325,7 @@ public final class IndexSettings { private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; private volatile String defaultPipeline; + private volatile boolean searchThrottled; /** * The maximum number of refresh listeners allows on this shard. @@ -402,6 +409,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this.indexMetaData = indexMetaData; numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null); + this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings); @@ -478,6 +486,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } @@ -879,4 +888,16 @@ private void setSoftDeleteRetentionOperations(long ops) { public long getSoftDeleteRetentionOperations() { return this.softDeleteRetentionOperations; } + + /** + * Returns true if the this index should be searched throttled ie. using the + * {@link org.elasticsearch.threadpool.ThreadPool.Names#SEARCH_THROTTLED} thread-pool + */ + public boolean isSearchThrottled() { + return searchThrottled; + } + + private void setSearchThrottled(boolean searchThrottled) { + this.searchThrottled = searchThrottled; + } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 5cb9f81626c94..d8829bd11d386 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -30,6 +30,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -106,8 +108,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static org.elasticsearch.common.unit.TimeValue.timeValueHours; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -344,7 +348,21 @@ public void onFailure(Exception e) { }); } - SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { + private void runAsync(long id, Supplier executable, ActionListener listener) { + getExecutor(id).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() { + listener.onResponse(executable.get()); + } + }); + } + + private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); @@ -405,59 +423,63 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } - public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - context.incRef(); - try { - context.setTask(task); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); - contextProcessing(context); - processScroll(request, context); - queryPhase.execute(context); - contextProcessedSuccessfully(context); - operationListener.onQueryPhase(context, System.nanoTime() - time); - return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); - } catch (Exception e) { - operationListener.onFailedQueryPhase(context); - logger.trace("Query phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + context.incRef(); + try { + context.setTask(task); + operationListener.onPreQueryPhase(context); + long time = System.nanoTime(); + contextProcessing(context); + processScroll(request, context); + queryPhase.execute(context); + contextProcessedSuccessfully(context); + operationListener.onQueryPhase(context, System.nanoTime() - time); + return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); + } catch (Exception e) { + operationListener.onFailedQueryPhase(context); + logger.trace("Query phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); + } + }, listener); } - public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - context.setTask(task); - IndexShard indexShard = context.indexShard(); - SearchOperationListener operationListener = indexShard.getSearchOperationListener(); - context.incRef(); - try { - contextProcessing(context); - context.searcher().setAggregatedDfs(request.dfs()); + public void executeQueryPhase(QuerySearchRequest request, SearchTask task, ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + context.setTask(task); + IndexShard indexShard = context.indexShard(); + SearchOperationListener operationListener = indexShard.getSearchOperationListener(); + context.incRef(); + try { + contextProcessing(context); + context.searcher().setAggregatedDfs(request.dfs()); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); - queryPhase.execute(context); - if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { - // no hits, we can release the context since there will be no fetch phase - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + operationListener.onPreQueryPhase(context); + long time = System.nanoTime(); + queryPhase.execute(context); + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { + // no hits, we can release the context since there will be no fetch phase + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); + } + operationListener.onQueryPhase(context, System.nanoTime() - time); + return context.queryResult(); + } catch (Exception e) { + operationListener.onFailedQueryPhase(context); + logger.trace("Query phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); } - operationListener.onQueryPhase(context, System.nanoTime() - time); - return context.queryResult(); - } catch (Exception e) { - operationListener.onFailedQueryPhase(context); - logger.trace("Query phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + }, listener); } private boolean fetchPhaseShouldFreeContext(SearchContext context) { @@ -470,66 +492,83 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } } - public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - context.incRef(); - try { - context.setTask(task); - contextProcessing(context); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - processScroll(request, context); - operationListener.onPreQueryPhase(context); - final long time = System.nanoTime(); + final Executor getExecutor(long id) { + SearchContext context = activeContexts.get(id); + if (context == null) { + throw new SearchContextMissingException(id); + } + return getExecutor(context.indexShard()); + } + + private Executor getExecutor(IndexShard indexShard) { + assert indexShard != null; + return threadPool.executor(indexShard.indexSettings().isSearchThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH); + } + + public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task, + ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + context.incRef(); try { - queryPhase.execute(context); + context.setTask(task); + contextProcessing(context); + SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + processScroll(request, context); + operationListener.onPreQueryPhase(context); + final long time = System.nanoTime(); + try { + queryPhase.execute(context); + } catch (Exception e) { + operationListener.onFailedQueryPhase(context); + throw ExceptionsHelper.convertToRuntime(e); + } + long afterQueryTime = System.nanoTime(); + operationListener.onQueryPhase(context, afterQueryTime - time); + QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime); + return new ScrollQueryFetchSearchResult(fetchSearchResult, + context.shardTarget()); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); + logger.trace("Fetch phase failed", e); + processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); } - long afterQueryTime = System.nanoTime(); - operationListener.onQueryPhase(context, afterQueryTime - time); - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime); - - return new ScrollQueryFetchSearchResult(fetchSearchResult, - context.shardTarget()); - } catch (Exception e) { - logger.trace("Fetch phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + }, listener); } - public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - context.incRef(); - try { - context.setTask(task); - contextProcessing(context); - if (request.lastEmittedDoc() != null) { - context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); - } - context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - operationListener.onPreFetchPhase(context); - long time = System.nanoTime(); - fetchPhase.execute(context); - if (fetchPhaseShouldFreeContext(context)) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + context.incRef(); + try { + context.setTask(task); + contextProcessing(context); + if (request.lastEmittedDoc() != null) { + context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); + } + context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + operationListener.onPreFetchPhase(context); + long time = System.nanoTime(); + fetchPhase.execute(context); + if (fetchPhaseShouldFreeContext(context)) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + operationListener.onFetchPhase(context, System.nanoTime() - time); + return context.fetchResult(); + } catch (Exception e) { + operationListener.onFailedFetchPhase(context); + logger.trace("Fetch phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); } - operationListener.onFetchPhase(context, System.nanoTime() - time); - return context.fetchResult(); - } catch (Exception e) { - operationListener.onFailedFetchPhase(context); - logger.trace("Fetch phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + }, listener); } private SearchContext findContext(long id, TransportRequest request) throws SearchContextMissingException { @@ -985,6 +1024,15 @@ public boolean canMatch(ShardSearchRequest request) throws IOException { } } + + public void canMatch(ShardSearchRequest request, ActionListener listener) { + try { + listener.onResponse(new CanMatchResponse(canMatch(request))); + } catch (IOException e) { + listener.onFailure(e); + } + } + /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words * if the execution of a the search request can be early terminated without executing it. This is for instance not possible if @@ -1009,31 +1057,27 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { * The action listener is guaranteed to be executed on the search thread-pool */ private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + Executor executor = getExecutor(shard); ActionListener actionListener = ActionListener.wrap(r -> - threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - protected void doRun() throws Exception { - listener.onResponse(request); - } - }), listener::onFailure); - IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); - if (shardOrNull != null) { // now we need to check if there is a pending refresh and register - ActionListener finalListener = actionListener; - actionListener = ActionListener.wrap(r -> - shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); - } + shard.awaitShardSearchActive(b -> + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() { + listener.onResponse(request); + } + }) + ), listener::onFailure); // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); - - } /** @@ -1050,4 +1094,31 @@ public IndicesService getIndicesService() { public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) { return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce); } + + public static final class CanMatchResponse extends SearchPhaseResult { + private boolean canMatch; + + public CanMatchResponse() { + } + + public CanMatchResponse(boolean canMatch) { + this.canMatch = canMatch; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + canMatch = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(canMatch); + } + + public boolean canMatch() { + return canMatch; + } + } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 2d3be2435b401..ecf311bc4b91d 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -71,6 +71,7 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; + public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; @@ -135,6 +136,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); + map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -175,6 +177,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); + builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, + Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 2a0fa6c7ce134..2798d66160044 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; @@ -64,8 +65,8 @@ public void testFilterShards() throws InterruptedException { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, - ActionListener listener) { - new Thread(() -> listener.onResponse(new CanMatchResponse(request.shardId().id() == 0 ? shard1 : + ActionListener listener) { + new Thread(() -> listener.onResponse(new SearchService.CanMatchResponse(request.shardId().id() == 0 ? shard1 : shard2))).start(); } }; @@ -123,14 +124,14 @@ public void testFilterWithFailure() throws InterruptedException { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, - ActionListener listener) { + ActionListener listener) { boolean throwException = request.shardId().id() != 0; if (throwException && randomBoolean()) { throw new IllegalArgumentException("boom"); } else { new Thread(() -> { if (throwException == false) { - listener.onResponse(new CanMatchResponse(shard1)); + listener.onResponse(new SearchService.CanMatchResponse(shard1)); } else { listener.onFailure(new NullPointerException()); } @@ -192,8 +193,8 @@ public void sendCanMatch( Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, - ActionListener listener) { - listener.onResponse(new CanMatchResponse(randomBoolean())); + ActionListener listener) { + listener.onResponse(new SearchService.CanMatchResponse(randomBoolean())); } }; diff --git a/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java b/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java index 8792232b381db..e2592ec41ccac 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java @@ -64,14 +64,14 @@ public List> getSettings() { public static class UpdateInternalOrPrivateAction extends Action { - static final UpdateInternalOrPrivateAction INSTANCE = new UpdateInternalOrPrivateAction(); + public static final UpdateInternalOrPrivateAction INSTANCE = new UpdateInternalOrPrivateAction(); private static final String NAME = "indices:admin/settings/update-internal-or-private-index"; public UpdateInternalOrPrivateAction() { super(NAME); } - static class Request extends MasterNodeRequest { + public static class Request extends MasterNodeRequest { private String index; private String key; @@ -81,7 +81,7 @@ static class Request extends MasterNodeRequest { } - Request(final String index, final String key, final String value) { + public Request(final String index, final String key, final String value) { this.index = index; this.key = key; this.value = value; diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 2562683466a8c..ed9b8992577d8 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -35,7 +36,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchNoneQueryBuilder; @@ -44,7 +48,10 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.settings.InternalOrPrivateSettingsPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.script.MockScriptEngine; @@ -55,6 +62,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; @@ -77,9 +85,12 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; public class SearchServiceTests extends ESSingleNodeTestCase { @@ -90,19 +101,51 @@ protected boolean resetNodeAfterTest() { @Override protected Collection> getPlugins() { - return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class); + return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class, InternalOrPrivateSettingsPlugin.class); } public static class CustomScriptPlugin extends MockScriptPlugin { static final String DUMMY_SCRIPT = "dummyScript"; + @Override protected Map, Object>> pluginScripts() { - return Collections.singletonMap(DUMMY_SCRIPT, vars -> { - return "dummy"; + return Collections.singletonMap(DUMMY_SCRIPT, vars -> "dummy"); + } + + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addSearchOperationListener(new SearchOperationListener() { + @Override + public void onNewContext(SearchContext context) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } + } + + @Override + public void onFetchPhase(SearchContext context, long tookInNanos) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } + } + + @Override + public void onQueryPhase(SearchContext context, long tookInNanos) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } + } }); } + } @Override @@ -210,15 +253,24 @@ public void onFailure(Exception e) { final int rounds = scaledRandomIntBetween(100, 10000); for (int i = 0; i < rounds; i++) { try { - SearchPhaseResult searchPhaseResult = service.executeQueryPhase( + try { + PlainActionFuture result = new PlainActionFuture<>(); + service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, - true, null, null), - new SearchTask(123L, "", "", "", null, Collections.emptyMap())); - IntArrayList intCursors = new IntArrayList(1); - intCursors.add(0); - ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */); - service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, + true, null, null), + new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); + SearchPhaseResult searchPhaseResult = result.get(); + IntArrayList intCursors = new IntArrayList(1); + intCursors.add(0); + ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null/* not a scroll */); + PlainActionFuture listener = new PlainActionFuture<>(); + service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); + listener.get(); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(RuntimeException.class)); + throw ((RuntimeException)ex.getCause()); + } } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { @@ -467,4 +519,37 @@ public void testCanRewriteToMatchNone() { .suggest(new SuggestBuilder()))); } + + public void testSetSearchThrottled() { + createIndex("throttled_threadpool_index"); + client().execute( + InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE, + new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("throttled_threadpool_index", + IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), "true")) + .actionGet(); + final SearchService service = getInstanceFromNode(SearchService.class); + Index index = resolveIndex("throttled_threadpool_index"); + assertTrue(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); + client().prepareIndex("throttled_threadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("throttled_threadpool_index").setSize(1).get(); + assertSearchHits(searchResponse, "1"); + // we add a search action listener in a plugin above to assert that this is actually used + client().execute( + InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE, + new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("throttled_threadpool_index", + IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), "false")) + .actionGet(); + + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> + client().admin().indices().prepareUpdateSettings("throttled_threadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_THROTTLED.getKey(), false)).get()); + assertEquals("can not update private setting [index.search.throttled]; this setting is managed by Elasticsearch", + iae.getMessage()); + assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); + ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null, + Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, false, null, null); + Thread currentThread = Thread.currentThread(); + // we still make sure can match is executed on the network thread + service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); + } }