From 9a32d97f4d692fc70e790a661a890de2ff2bc2ec Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 21 Nov 2019 11:23:58 +0100 Subject: [PATCH 01/12] Add a SearchShardTask for search phases different from the SearchTask --- .../action/search/SearchShardTask.java | 44 +++++++++++++++++++ .../action/search/SearchTask.java | 2 +- .../action/search/SearchTransportService.java | 26 +++++------ .../search/DefaultSearchContext.java | 8 ++-- .../elasticsearch/search/SearchService.java | 20 +++++---- .../search/fetch/ShardFetchRequest.java | 4 +- .../internal/FilteredSearchContext.java | 6 +-- .../internal/InternalScrollSearchRequest.java | 4 +- .../search/internal/SearchContext.java | 6 +-- .../search/internal/ShardSearchRequest.java | 4 +- .../search/query/QueryPhase.java | 4 +- .../search/query/QuerySearchRequest.java | 4 +- .../index/SearchSlowLogTests.java | 10 ++--- .../search/SearchServiceTests.java | 6 +-- .../search/query/QueryPhaseTests.java | 24 +++++----- .../elasticsearch/test/TestSearchContext.java | 8 ++-- 16 files changed, 113 insertions(+), 67 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java new file mode 100644 index 0000000000000..4719c1fda9d53 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; + +import java.util.Map; + +/** + * Task storing information about a currently running search shard request. + * See {@link ShardSearchRequest}, {@link ShardFetchSearchRequest}, ... + */ +public class SearchShardTask extends CancellableTask { + + public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index 699448909a2b5..8e3ec00283b51 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -25,7 +25,7 @@ import java.util.Map; /** - * Task storing information about a currently running search request. + * Task storing information about a currently running {@link SearchRequest}. */ public class SearchTask extends CancellableTask { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 4b66ed885db20..822557696f7e5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -307,7 +307,7 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new, (request, channel, task) -> { - searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener() { + searchService.executeDfsPhase(request, (SearchShardTask) task, new ActionListener() { @Override public void onResponse(SearchPhaseResult searchPhaseResult) { try { @@ -331,44 +331,44 @@ public void onFailure(Exception e) { transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>( - channel, QUERY_ACTION_NAME, request)); + searchService.executeQueryPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request)); }); TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME, (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, - request)); + searchService.executeQueryPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME, - request)); + searchService.executeQueryPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, - QUERY_FETCH_SCROLL_ACTION_NAME, request)); + searchService.executeFetchPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_FETCH_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new, (request, channel, task) -> { - searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, - FETCH_ID_SCROLL_ACTION_NAME, request)); + searchService.executeFetchPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, FETCH_ID_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new, (request, channel, task) -> { - searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, - request)); + searchService.executeFetchPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 4736be6be7e43..c7f0f885d793b 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -24,7 +24,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -120,7 +120,7 @@ final class DefaultSearchContext extends SearchContext { private boolean lowLevelCancellation; // filter for sliced scroll private SliceBuilder sliceBuilder; - private SearchTask task; + private SearchShardTask task; /** * The original query as sent by the user without the types and aliases @@ -805,12 +805,12 @@ public void setProfilers(Profilers profilers) { } @Override - public void setTask(SearchTask task) { + public void setTask(SearchShardTask task) { this.task = task; } @Override - public SearchTask getTask() { + public SearchShardTask getTask() { return task; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index dfefef88292b4..62184e082b901 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -27,7 +27,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; @@ -301,11 +301,11 @@ protected void doClose() { keepAliveReaper.cancel(); } - public void executeDfsPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { + public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); } - private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { + private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); try { @@ -336,7 +336,7 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } - public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { + public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); } @@ -344,7 +344,7 @@ private void runAsync(long id, Supplier executable, ActionListener lis getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { + private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { final SearchContext context = createAndPutContext(request); context.incRef(); try { @@ -392,7 +392,9 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, long aft return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } - public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener listener) { + public void executeQueryPhase(InternalScrollSearchRequest request, + SearchShardTask task, + ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.incRef(); @@ -414,7 +416,7 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask ta }, listener); } - public void executeQueryPhase(QuerySearchRequest request, SearchTask task, ActionListener listener) { + public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.setTask(task); @@ -464,7 +466,7 @@ private Executor getExecutor(IndexShard indexShard) { return threadPool.executor(indexShard.indexSettings().isSearchThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH); } - public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task, + public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); @@ -487,7 +489,7 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta }, listener); } - public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener listener) { + public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.incRef(); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index cb2bc99370bca..2e7c59b329874 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -22,7 +22,7 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -112,7 +112,7 @@ public ScoreDoc lastEmittedDoc() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index 10eb90afc04fe..8c04954a4efcf 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -22,7 +22,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -535,12 +535,12 @@ public QueryShardContext getQueryShardContext() { } @Override - public void setTask(SearchTask task) { + public void setTask(SearchShardTask task) { in.setTask(task); } @Override - public SearchTask getTask() { + public SearchShardTask getTask() { return in.getTask(); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index 9d7ba557cc260..fe8d173711c69 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -20,7 +20,7 @@ package org.elasticsearch.search.internal; import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.Scroll; @@ -73,7 +73,7 @@ public InternalScrollSearchRequest scroll(Scroll scroll) { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index fba80d5f3c6e0..c0477ece69aa0 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -22,7 +22,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; @@ -93,9 +93,9 @@ protected SearchContext() { super("search_context"); } - public abstract void setTask(SearchTask task); + public abstract void setTask(SearchShardTask task); - public abstract SearchTask getTask(); + public abstract SearchShardTask getTask(); public abstract boolean isCancelled(); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index f1f38d700cc5d..27543a2285d32 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -292,7 +292,7 @@ public String getClusterAlias() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 7f3a7a5b1b513..a715d9494ca5d 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -37,7 +37,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; @@ -224,7 +224,7 @@ static boolean execute(SearchContext searchContext, final Runnable cancellationRunnable; if (searchContext.lowLevelCancellation()) { - SearchTask task = searchContext.getTask(); + SearchShardTask task = searchContext.getTask(); cancellationRunnable = () -> { if (task.isCancelled()) throw new TaskCancelledException("cancelled"); }; } else { cancellationRunnable = null; diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index e458603310ce4..d3919ec3aba48 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -21,7 +21,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -86,7 +86,7 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } public String getDescription() { diff --git a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java index e5cccd4c15c32..63424cf2b2aca 100644 --- a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.index; import org.elasticsearch.Version; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.logging.ESLogMessage; @@ -78,7 +78,7 @@ public void testSlowLogHasJsonFields() throws IOException { SearchContext searchContext = createSearchContext(index); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); ESLogMessage p = SearchSlowLog.SearchSlowLogMessage.of(searchContext, 10); @@ -97,7 +97,7 @@ public void testSlowLogsWithStats() throws IOException { SearchContext searchContext = createSearchContext(index,"group1"); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); ESLogMessage p = SearchSlowLog.SearchSlowLogMessage.of(searchContext, 10); @@ -106,7 +106,7 @@ public void testSlowLogsWithStats() throws IOException { searchContext = createSearchContext(index, "group1", "group2"); source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); p = SearchSlowLog.SearchSlowLogMessage.of(searchContext, 10); assertThat(p.get("stats"), equalTo("[\\\"group1\\\", \\\"group2\\\"]")); @@ -117,7 +117,7 @@ public void testSlowLogSearchContextPrinterToLog() throws IOException { SearchContext searchContext = createSearchContext(index); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); ESLogMessage p = SearchSlowLog.SearchSlowLogMessage.of(searchContext, 10); assertThat(p.getFormattedMessage(), startsWith("[foo][0]")); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 00c6a0cc8b15f..445e4feea97e8 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -32,7 +32,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -313,13 +313,13 @@ public void onFailure(Exception e) { new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), - new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); + new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null/* not a scroll */); PlainActionFuture listener = new PlainActionFuture<>(); - service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); + service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); if (useScroll) { service.freeContext(searchPhaseResult.getRequestId()); diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 2190e573707e6..39d8bd631c3ba 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -63,7 +63,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.search.ESToParentBlockJoinQuery; @@ -110,7 +110,7 @@ private void countTestCase(Query query, IndexReader reader, boolean shouldCollec TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); final IndexSearcher searcher = shouldCollectSearch ? new IndexSearcher(reader) : getAssertingEarlyTerminationSearcher(reader, 0); @@ -198,7 +198,7 @@ public void testPostFilterDisablesCountOptimization() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); @@ -228,7 +228,7 @@ public void testTerminateAfterWithFilter() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = new IndexSearcher(reader); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.terminateAfter(1); context.setSize(10); @@ -257,7 +257,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); assertEquals(1, context.queryResult().topDocs().topDocs.totalHits.value); @@ -271,7 +271,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { public void testQueryCapturesThreadPoolStats() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); Directory dir = newDirectory(); @@ -313,7 +313,7 @@ public void testInOrderScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); int size = randomIntBetween(2, 5); context.setSize(size); @@ -351,7 +351,7 @@ public void testTerminateAfterEarlyTermination() throws Exception { } w.close(); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); final IndexReader reader = DirectoryReader.open(dir); @@ -458,7 +458,7 @@ public void testIndexSortingEarlyTermination() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(1); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW})); final IndexReader reader = DirectoryReader.open(dir); @@ -545,7 +545,7 @@ public void testIndexSortScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(10); context.sort(searchSortAndFormat); @@ -599,7 +599,7 @@ public void testDisableTopScoreCollection() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = new IndexSearcher(reader); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); Query q = new SpanNearQuery.Builder("title", true) .addClause(new SpanTermQuery(new Term("title", "foo"))) .addClause(new SpanTermQuery(new Term("title", "bar"))) @@ -704,7 +704,7 @@ public void testMinScore() throws Exception { .build() )); context.minimumScore(0.01f); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(1); context.trackTotalHitsUpTo(5); diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 83da817f64bf2..32b1245650e94 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -22,7 +22,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -79,7 +79,7 @@ public class TestSearchContext extends SearchContext { ParsedQuery postFilter; Query query; Float minScore; - SearchTask task; + SearchShardTask task; SortAndFormats sort; boolean trackScores = false; int trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO; @@ -604,12 +604,12 @@ public QueryShardContext getQueryShardContext() { } @Override - public void setTask(SearchTask task) { + public void setTask(SearchShardTask task) { this.task = task; } @Override - public SearchTask getTask() { + public SearchShardTask getTask() { return task; } From 55ac3a01f9c8d61459211348734d64916582932a Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 21 Nov 2019 14:22:26 +0100 Subject: [PATCH 02/12] Revert "Split search in two when made against throttled and non throttled searches (#42510)" This reverts commit dd1ce50c0aa5fef10bfc996d1c74786c0a044382. --- .../action/search/TransportSearchAction.java | 178 +++++------------- .../TransportSearchActionSingleNodeTests.java | 61 ------ .../search/TransportSearchActionTests.java | 75 -------- .../test/indices.freeze/10_basic.yml | 19 -- 4 files changed, 44 insertions(+), 289 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index d4832fb0d7a10..a7c0a785c7fce 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -473,89 +472,10 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); - Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); - - if (shouldSplitIndices(searchRequest)) { - //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible. - //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other - //indices (possibly slower) being searched at the same time. - List writeIndicesList = new ArrayList<>(); - List readOnlyIndicesList = new ArrayList<>(); - splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList); - String[] writeIndices = writeIndicesList.toArray(new String[0]); - String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]); - - if (readOnlyIndices.length == 0) { - executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) { - executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } else { - //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so - //that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices. - CountDown countDown = new CountDown(2); - AtomicReference exceptions = new AtomicReference<>(); - SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, - searchService::createReduceContext); - CountDownActionListener countDownActionListener = - new CountDownActionListener<>(countDown, exceptions, listener) { - @Override - void innerOnResponse(SearchResponse searchResponse) { - searchResponseMerger.add(searchResponse); - } - - @Override - SearchResponse createFinalResponse() { - return searchResponseMerger.getMergedResponse(clusters); - } - }; - - //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and - //will be provided separately to executeSearch. - SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); - executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener, - SearchResponse.Clusters.EMPTY); - - //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and - //will be provided separately to executeSearch. - SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); - executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, - aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener, - SearchResponse.Clusters.EMPTY); - } - } else { - String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new); - executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } - } - - static boolean shouldSplitIndices(SearchRequest searchRequest) { - return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH - && (searchRequest.source() == null || searchRequest.source().size() != 0); - } - - static void splitIndices(Index[] indices, ClusterState clusterState, List writeIndices, List readOnlyIndices) { - for (Index index : indices) { - ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName()); - if (writeBlock == null) { - writeIndices.add(index.getName()); - } else { - readOnlyIndices.add(index.getName()); - } + String[] concreteIndices = new String[indices.length]; + for (int i = 0; i < indices.length; i++) { + concreteIndices[i] = indices[i].getName(); } - } - - private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, - OriginalIndices localIndices, String[] concreteIndices, Map> routingMap, - Map aliasFilter, Map concreteIndexBoosts, - List remoteShardIterators, BiFunction remoteConnections, - ClusterState clusterState, ActionListener listener, SearchResponse.Clusters clusters) { - Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); @@ -564,6 +484,8 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea failIfOverShardCountLimit(clusterService, shardIterators.size()); + Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); + // optimize search type for cases where there is only one shard group to search on if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard @@ -576,9 +498,11 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea if (searchRequest.isSuggestOnly()) { // disable request cache if we have only suggest searchRequest.requestCache(false); - if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) { - // convert to Q_T_F if we have only suggest - searchRequest.searchType(QUERY_THEN_FETCH); + switch (searchRequest.searchType()) { + case DFS_QUERY_THEN_FETCH: + // convert to Q_T_F if we have only suggest + searchRequest.searchType(QUERY_THEN_FETCH); + break; } } @@ -687,16 +611,22 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int } } - abstract static class CountDownActionListener implements ActionListener { + abstract static class CCSActionListener implements ActionListener { + private final String clusterAlias; + private final boolean skipUnavailable; private final CountDown countDown; + private final AtomicInteger skippedClusters; private final AtomicReference exceptions; - private final ActionListener delegateListener; + private final ActionListener originalListener; - CountDownActionListener(CountDown countDown, AtomicReference exceptions, - ActionListener delegateListener) { + CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, + AtomicReference exceptions, ActionListener originalListener) { + this.clusterAlias = clusterAlias; + this.skipUnavailable = skipUnavailable; this.countDown = countDown; + this.skippedClusters = skippedClusters; this.exceptions = exceptions; - this.delegateListener = delegateListener; + this.originalListener = originalListener; } @Override @@ -707,7 +637,26 @@ public final void onResponse(Response response) { abstract void innerOnResponse(Response response); - final void maybeFinish() { + @Override + public final void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + Exception exception = e; + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { + exception = wrapRemoteClusterFailure(clusterAlias, e); + } + if (exceptions.compareAndSet(null, exception) == false) { + exceptions.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } + maybeFinish(); + } + + private void maybeFinish() { if (countDown.countDown()) { Exception exception = exceptions.get(); if (exception == null) { @@ -715,56 +664,17 @@ final void maybeFinish() { try { response = createFinalResponse(); } catch(Exception e) { - delegateListener.onFailure(e); + originalListener.onFailure(e); return; } - delegateListener.onResponse(response); + originalListener.onResponse(response); } else { - delegateListener.onFailure(exceptions.get()); + originalListener.onFailure(exceptions.get()); } } } abstract FinalResponse createFinalResponse(); - - @Override - public void onFailure(Exception e) { - if (exceptions.compareAndSet(null, e) == false) { - exceptions.accumulateAndGet(e, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - maybeFinish(); - } - } - - abstract static class CCSActionListener extends CountDownActionListener { - private final String clusterAlias; - private final boolean skipUnavailable; - private final AtomicInteger skippedClusters; - - CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, - AtomicReference exceptions, ActionListener originalListener) { - super(countDown, exceptions, originalListener); - this.clusterAlias = clusterAlias; - this.skipUnavailable = skipUnavailable; - this.skippedClusters = skippedClusters; - } - - @Override - public final void onFailure(Exception e) { - if (skipUnavailable) { - skippedClusters.incrementAndGet(); - maybeFinish(); - } else { - Exception exception = e; - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { - exception = wrapRemoteClusterFailure(clusterAlias, e); - } - super.onFailure(exception); - } - } } private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index fa6160839d2a9..10f252c30dc3b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -19,14 +19,11 @@ package org.elasticsearch.action.search; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -177,62 +174,4 @@ public void testFinalReduce() { assertEquals(2, longTerms.getBuckets().size()); } } - - public void testSplitIndices() { - { - CreateIndexResponse response = client().admin().indices().prepareCreate("write").get(); - assertTrue(response.isAcknowledged()); - } - { - CreateIndexResponse response = client().admin().indices().prepareCreate("readonly").get(); - assertTrue(response.isAcknowledged()); - } - { - SearchResponse response = client().prepareSearch("readonly").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("write").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("readonly", "write").get(); - assertEquals(2, response.getTotalShards()); - assertEquals(2, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - Settings settings = Settings.builder().put("index.blocks.read_only", "true").build(); - AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); - assertTrue(response.isAcknowledged()); - } - try { - { - SearchResponse response = client().prepareSearch("readonly").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("write").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("readonly", "write").get(); - assertEquals(2, response.getTotalShards()); - assertEquals(2, response.getSuccessfulShards()); - assertEquals(3, response.getNumReducePhases()); - } - } finally { - Settings settings = Settings.builder().put("index.blocks.read_only", "false").build(); - AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); - assertTrue(response.isAcknowledged()); - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index a2c83c43a9770..e9aeff6847ad5 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -29,10 +29,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIteratorTests; @@ -841,75 +837,4 @@ public void testShouldMinimizeRoundtrips() throws Exception { assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); } } - - public void testShouldSplitIndices() { - { - SearchRequest searchRequest = new SearchRequest(); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder()); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder().size(randomIntBetween(1, 100))); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.scroll("5s"); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder().size(0)); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - } - - public void testSplitIndices() { - int numIndices = randomIntBetween(1, 10); - Index[] indices = new Index[numIndices]; - for (int i = 0; i < numIndices; i++) { - String indexName = randomAlphaOfLengthBetween(5, 10); - indices[i] = new Index(indexName, indexName + "-uuid"); - } - { - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); - List writeIndices = new ArrayList<>(); - List readOnlyIndices = new ArrayList<>(); - TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices); - assertEquals(0, readOnlyIndices.size()); - assertEquals(numIndices, writeIndices.size()); - } - { - List expectedWrite = new ArrayList<>(); - List expectedReadOnly = new ArrayList<>(); - ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder(); - for (Index index : indices) { - if (randomBoolean()) { - blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK); - expectedReadOnly.add(index.getName()); - } else if(randomBoolean() ){ - blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK); - expectedReadOnly.add(index.getName()); - } else { - expectedWrite.add(index.getName()); - } - } - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).blocks(blocksBuilder).build(); - List writeIndices = new ArrayList<>(); - List readOnlyIndices = new ArrayList<>(); - TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices); - assertEquals(writeIndices, expectedWrite); - assertEquals(readOnlyIndices, expectedReadOnly); - } - } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml index 4a5f713ea74a5..4ba49e53308d2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml @@ -77,25 +77,6 @@ - match: {hits.total: 0} -- do: - index: - index: ordinary - id: "1" - body: { "foo": "Hello: 1" } - refresh: wait_for - -- do: - search: - rest_total_hits_as_int: true - index: [test, ordinary] - ignore_throttled: false - body: - query: - match: - foo: hello - -- match: {hits.total: 3} - --- "Test index options": - do: From 931d8d0df4e163113fbcc1788c19037acc56f914 Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 22 Nov 2019 00:05:13 +0100 Subject: [PATCH 03/12] Add a listener to track the progress of a search request locally This commit adds a function in NodeClient that allows to track the progress of a search request locally. Progress is tracked through a SearchProgressListener that exposes query and fetch responses as well as partial and final reduces. This new method can be used by modules/plugins inside a node in order to track the progress of a local search request. Relates #49091 --- .../search/AbstractSearchAsyncAction.java | 4 +- .../search/CanMatchPreFilterSearchPhase.java | 2 +- .../action/search/DfsQueryPhase.java | 7 +- .../action/search/FetchSearchPhase.java | 11 + .../action/search/SearchActionListener.java | 2 +- .../action/search/SearchPhaseController.java | 47 +++- .../action/search/SearchPhaseResults.java | 2 +- .../search/SearchProgressActionListener.java | 29 +++ .../action/search/SearchProgressListener.java | 111 +++++++++ .../SearchQueryThenFetchAsyncAction.java | 7 +- .../action/search/SearchShard.java | 72 ++++++ .../action/search/SearchTask.java | 16 +- .../elasticsearch/client/node/NodeClient.java | 38 +++ .../action/search/FetchSearchPhaseTests.java | 18 +- .../SearchActionProgressListenerIT.java | 218 ++++++++++++++++++ .../search/SearchPhaseControllerTests.java | 103 ++++++++- .../action/search/TransportSearchIT.java | 1 - 17 files changed, 657 insertions(+), 31 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java create mode 100644 server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java create mode 100644 server/src/main/java/org/elasticsearch/action/search/SearchShard.java create mode 100644 server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 2d2b9213c63ff..7a7b6ae7df43d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -87,7 +87,7 @@ abstract class AbstractSearchAsyncAction exten private final SearchResponse.Clusters clusters; private final GroupShardsIterator toSkipShardsIts; - private final GroupShardsIterator shardsIts; + protected final GroupShardsIterator shardsIts; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final int maxConcurrentRequestsPerNode; @@ -443,7 +443,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter } } - results.consumeShardFailure(shardIndex); + results.consumeShardFailure(shardIndex, e); } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index aba32d2c850a0..1b1cd8efdb0d5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -159,7 +159,7 @@ boolean hasResult(int shardIndex) { } @Override - void consumeShardFailure(int shardIndex) { + void consumeShardFailure(int shardIndex, Exception exc) { // we have to carry over shard failures in order to account for them in the response. consumeResult(shardIndex, true, null); } diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 6fb6a49e3f557..a64816d29a5ef 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.query.QuerySearchRequest; @@ -46,13 +47,15 @@ final class DfsQueryPhase extends SearchPhase { private final Function, SearchPhase> nextPhaseFactory; private final SearchPhaseContext context; private final SearchTransportService searchTransportService; + private final SearchProgressListener progressListener; DfsQueryPhase(AtomicArray dfsSearchResults, SearchPhaseController searchPhaseController, Function, SearchPhase> nextPhaseFactory, SearchPhaseContext context) { super("dfs_query"); - this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards()); + this.progressListener = context.getTask().getProgressListener(); + this.queryResult = searchPhaseController.newSearchPhaseResults(progressListener, context.getRequest(), context.getNumShards()); this.searchPhaseController = searchPhaseController; this.dfsSearchResults = dfsSearchResults; this.nextPhaseFactory = nextPhaseFactory; @@ -69,6 +72,8 @@ public void run() throws IOException { final CountedCollector counter = new CountedCollector<>(queryResult::consumeResult, resultList.size(), () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context); + final SearchSourceBuilder sourceBuilder = context.getRequest().source(); + progressListener.onListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0); for (final DfsSearchResult dfsResult : resultList) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index cc16e2d9f182a..874c15ff83c80 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -34,6 +34,7 @@ import org.elasticsearch.transport.Transport; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.function.BiFunction; @@ -49,6 +50,7 @@ final class FetchSearchPhase extends SearchPhase { private final SearchPhaseContext context; private final Logger logger; private final SearchPhaseResults resultConsumer; + private final SearchProgressListener progressListener; FetchSearchPhase(SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, @@ -72,6 +74,7 @@ final class FetchSearchPhase extends SearchPhase { this.context = context; this.logger = context.getLogger(); this.resultConsumer = resultConsumer; + this.progressListener = context.getTask().getProgressListener(); } @Override @@ -136,6 +139,8 @@ private void innerRun() throws IOException { } // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); + // empty result + progressListener.onFetchResult(queryResult.fetchResult()); } else { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), @@ -164,11 +169,16 @@ private void executeFetch(final int shardIndex, final SearchShardTarget shardTar new SearchActionListener(shardTarget, shardIndex) { @Override public void innerOnResponse(FetchSearchResult result) { + boolean success = false; try { counter.onResult(result); + success = true; } catch (Exception e) { context.onPhaseFailure(FetchSearchPhase.this, "", e); } + if (success) { + progressListener.onFetchResult(result); + } } @Override @@ -182,6 +192,7 @@ public void onFailure(Exception e) { // request to clear the search context. releaseIrrelevantSearchContext(querySearchResult); } + progressListener.onFetchFailure(shardIndex, e); } }); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java index e9b5598556ff7..356da9e967623 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java @@ -23,7 +23,7 @@ import org.elasticsearch.search.SearchShardTarget; /** - * An base action listener that ensures shard target and shard index is set on all responses + * A base action listener that ensures shard target and shard index is set on all responses * received by this listener. */ abstract class SearchActionListener implements ActionListener { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 09811fd1d2ff1..f630a6a100a03 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -571,6 +571,7 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults= 2 if there is more than one expected result"); @@ -595,6 +597,7 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR throw new IllegalArgumentException("either aggs or top docs must be present"); } this.controller = controller; + this.progressListener = progressListener; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; @@ -605,11 +608,17 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR this.performFinalReduce = performFinalReduce; } + @Override + void consumeShardFailure(int shardIndex, Exception exc) { + progressListener.onQueryFailure(shardIndex, exc); + } + @Override public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); consumeInternal(queryResult); + progressListener.onQueryResult(queryResult); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { @@ -629,6 +638,10 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { } numReducePhases++; index = 1; + if (hasAggs) { + progressListener.onPartialReduce(progressListener.searchShards(results.asList()), + topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); + } } final int i = index++; if (hasAggs) { @@ -652,8 +665,10 @@ private synchronized List getRemainingTopDocs() { @Override public ReducedQueryPhase reduce() { - return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, - numReducePhases, false, performFinalReduce); + ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(), + getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce); + progressListener.onReduce(progressListener.searchShards(results.asList()), reducePhase.totalHits, reducePhase.aggregations); + return reducePhase; } /** @@ -678,7 +693,9 @@ private int resolveTrackTotalHits(SearchRequest request) { /** * Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally. */ - ArraySearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { + ArraySearchPhaseResults newSearchPhaseResults(SearchProgressListener listener, + SearchRequest request, + int numShards) { SearchSourceBuilder source = request.source(); boolean isScrollRequest = request.scroll() != null; final boolean hasAggs = source != null && source.aggregations() != null; @@ -688,14 +705,30 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchRequest r // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, + return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, trackTotalHitsUpTo, request.isFinalReduce()); } } return new ArraySearchPhaseResults(numShards) { + @Override + void consumeResult(SearchPhaseResult result) { + super.consumeResult(result); + listener.onQueryResult(result.queryResult()); + } + + @Override + void consumeShardFailure(int shardIndex, Exception exc) { + super.consumeShardFailure(shardIndex, exc); + listener.onQueryFailure(shardIndex, exc); + } + @Override ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce()); + List resultList = results.asList(); + final ReducedQueryPhase reducePhase = + reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce()); + listener.onReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations); + return reducePhase; } }; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java index e81cf4b74e234..f336aa6d317b3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java @@ -56,7 +56,7 @@ final int getNumShards() { */ abstract boolean hasResult(int shardIndex); - void consumeShardFailure(int shardIndex) {} + void consumeShardFailure(int shardIndex, Exception exc) {} AtomicArray getAtomicArray() { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java new file mode 100644 index 0000000000000..508b059c95952 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; + +/** + * An {@link ActionListener} for search requests that allows to track progress of the {@link SearchAction}. + * See {@link SearchProgressListener}. + */ +public abstract class SearchProgressActionListener extends SearchProgressListener implements ActionListener { +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java new file mode 100644 index 0000000000000..1ff58a09fa1f4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.query.QuerySearchResult; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * A listener that allows to track progress of the {@link SearchAction}. + */ +abstract class SearchProgressListener { + /** + * Executed when shards are ready to be queried. + * + * @param shards The list of shards to query. + * @param fetchPhase true if the search needs a fetch phase, false otherwise. + **/ + public void onListShards(List shards, boolean fetchPhase) {} + + /** + * Executed when a shard returns a query result. + * + * @param result The query result. + */ + public void onQueryResult(QuerySearchResult result) {} + + /** + * Executed when a shard reports a query failure. + * + * @param shardIndex The index of the shard in the list provided by onListShards. + * @param exc The cause of the failure. + */ + public void onQueryFailure(int shardIndex, Exception exc) { } + + /** + * Executed when a partial reduce is created. The number of partial reduce can be controlled via + * {@link SearchRequest#setBatchedReduceSize(int)}. + * + * @param shards The list of shards that are part of this reduce. + * @param totalHits The total number of hits in this reduce. + * @param aggs The partial result for aggregations. + * @param version The version number for this reduce. + */ + public void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) {} + + /** + * Executed once when the final reduce is created. + * + * @param shards The list of shards that are part of this reduce. + * @param totalHits The total number of hits in this reduce. + * @param aggs The final result for aggregations. + */ + public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) {} + + /** + * Executed when a shard returns a query result. + * + * @param result The fetch result. + */ + public void onFetchResult(FetchSearchResult result) {} + + /** + * Executed when a shard reports a fetch failure. + * + * @param shardIndex The index of the shard in the list provided by onListShards. + * @param exc The cause of the failure. + */ + public void onFetchFailure(int shardIndex, Exception exc) {} + + final List searchShards(List results) { + return results.stream() + .filter(Objects::nonNull) + .map(SearchPhaseResult::getSearchShardTarget) + .map(e -> new SearchShard(e.getClusterAlias(), e.getShardId())) + .collect(Collectors.toList()); + } + + final List searchShards(GroupShardsIterator its) { + return StreamSupport.stream(its.spliterator(), false) + .map(e -> new SearchShard(e.getClusterAlias(), e.shardId())) + .collect(Collectors.toList()); + } + + public static final SearchProgressListener NOOP = new SearchProgressListener() {}; +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index bbd84011de00b..6ace48a7fb551 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; @@ -45,8 +46,12 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { + private final String clusterAlias; + private final ShardId shardId; + + SearchShard(String clusterAlias, ShardId shardId) { + this.clusterAlias = clusterAlias; + this.shardId = shardId; + } + + public String getClusterAlias() { + return clusterAlias; + } + + public ShardId getShardId() { + return shardId; + } + + @Override + public int compareTo(SearchShard o) { + int cmp = Objects.compare(clusterAlias, o.clusterAlias, + Comparator.nullsFirst(Comparator.naturalOrder())); + return cmp != 0 ? cmp : shardId.compareTo(o.shardId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchShard that = (SearchShard) o; + return Objects.equals(clusterAlias, that.clusterAlias) && + shardId.equals(that.shardId); + } + + @Override + public int hashCode() { + return Objects.hash(clusterAlias, shardId); + } + + @Override + public String toString() { + return "SearchShard{" + + "clusterAlias='" + clusterAlias + '\'' + + ", shardId=" + shardId + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index 8e3ec00283b51..97247e443bb64 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -28,14 +28,28 @@ * Task storing information about a currently running {@link SearchRequest}. */ public class SearchTask extends CancellableTask { + private SearchProgressListener progressListener = SearchProgressListener.NOOP; public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { super(id, type, action, description, parentTaskId, headers); } + /** + * Attach a {@link SearchProgressListener} to this task. + */ + public void setProgressListener(SearchProgressListener progressListener) { + this.progressListener = progressListener; + } + + /** + * Return the {@link SearchProgressListener} attached to this task. + */ + public SearchProgressListener getProgressListener() { + return progressListener; + } + @Override public boolean shouldCancelChildrenOnCancellation() { return true; } - } diff --git a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java index 091e0cdf63b89..f28beb847bfb6 100644 --- a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -23,6 +23,12 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchProgressActionListener; +import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; @@ -100,6 +106,38 @@ > Task executeLocally(ActionType action, Request request, TaskListener listener::onResponse, listener::onFailure); } + /** + * Execute a {@link SearchRequest} locally and track the progress of the request through + * a {@link SearchProgressActionListener}. + */ + public SearchTask executeSearchLocally(SearchRequest request, SearchProgressActionListener listener) { + // we cannot track the progress if remote cluster requests are splitted. + request.setCcsMinimizeRoundtrips(false); + TransportSearchAction action = (TransportSearchAction) actions.get(SearchAction.INSTANCE); + SearchTask task = (SearchTask) taskManager.register("transport", action.actionName, request); + task.setProgressListener(listener); + action.execute(task, request, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + try { + taskManager.unregister(task); + } finally { + listener.onResponse(response); + } + } + + @Override + public void onFailure(Exception e) { + try { + taskManager.unregister(task); + } finally { + listener.onFailure(e); + } + } + }); + return task; + } + /** * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by * {@link #executeLocally(ActionType, ActionRequest, TaskListener)}. diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 86d7a7ff7b5dc..6c9058812fd29 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.shard.ShardId; @@ -42,17 +43,21 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.action.search.SearchProgressListener.NOOP; + public class FetchSearchPhaseTests extends ESTestCase { public void testShortcutQueryAndFetchOptimization() { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); - ArraySearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1); + ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 1); boolean hasHits = randomBoolean(); final int numHits; if (hasHits) { QuerySearchResult queryResult = new QuerySearchResult(); + queryResult.setSearchShardTarget(new SearchShardTarget("node0", + new ShardId("index", "index", 0), null, OriginalIndices.NONE)); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 1.0F), new DocValueFormat[0]); queryResult.size(1); @@ -89,7 +94,7 @@ public void testFetchTwoDocument() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); - ArraySearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -147,7 +152,7 @@ public void testFailFetchOneDoc() { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); ArraySearchPhaseResults results = - controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -208,7 +213,8 @@ public void testFetchDocsConcurrently() throws InterruptedException { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); - ArraySearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits); + ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, + mockSearchPhaseContext.getRequest(), numHits); for (int i = 0; i < numHits; i++) { QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -265,7 +271,7 @@ public void testExceptionFailsPhase() { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); ArraySearchPhaseResults results = - controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -321,7 +327,7 @@ public void testCleanupIrrelevantContexts() { // contexts that are not fetched s SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); ArraySearchPhaseResults results = - controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = 1; QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java b/server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java new file mode 100644 index 0000000000000..8ce6c3118cc99 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java @@ -0,0 +1,218 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class SearchActionProgressListenerIT extends ESSingleNodeTestCase { + private List shards; + + public void setUp() throws Exception { + super.setUp(); + shards = createRandomIndices(client()); + } + + public void testSearchProgressSimple() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source(new SearchSourceBuilder().size(0)); + testCase((NodeClient) client(), request, shards, false); + } + } + + public void testSearchProgressWithHits() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(10) + ); + testCase((NodeClient) client(), request, shards, true); + } + } + + public void testSearchProgressWithAggs() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(0) + .aggregation(AggregationBuilders.max("max").field("number")) + ); + testCase((NodeClient) client(), request, shards, false); + } + } + + public void testSearchProgressWithHitsAndAggs() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(10) + .aggregation(AggregationBuilders.max("max").field("number")) + ); + testCase((NodeClient) client(), request, shards, true); + } + } + + public void testSearchProgressWithQuery() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(10) + .query(QueryBuilders.termQuery("foo", "bar")) + ); + testCase((NodeClient) client(), request, shards, true); + } + } + + public void testSearchProgressWithShardSort() throws Exception { + SearchRequest request = new SearchRequest("index-*") + .source( + new SearchSourceBuilder() + .size(0) + .sort(new FieldSortBuilder("number").order(SortOrder.DESC)) + ); + request.setPreFilterShardSize(1); + List sortShards = new ArrayList<>(shards); + Collections.sort(sortShards, Comparator.reverseOrder()); + testCase((NodeClient) client(), request, sortShards, false); + } + + private static void testCase(NodeClient client, SearchRequest request, + List expectedShards, boolean hasFetchPhase) throws InterruptedException { + AtomicInteger numQueryResults = new AtomicInteger(); + AtomicInteger numQueryFailures = new AtomicInteger(); + AtomicInteger numFetchResults = new AtomicInteger(); + AtomicInteger numFetchFailures = new AtomicInteger(); + AtomicInteger numReduces = new AtomicInteger(); + AtomicReference searchResponse = new AtomicReference<>(); + AtomicReference> shardsListener = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + SearchProgressActionListener listener = new SearchProgressActionListener() { + @Override + public void onListShards(List shards, boolean fetchPhase) { + shardsListener.set(shards); + assertEquals(fetchPhase, hasFetchPhase); + } + + @Override + public void onQueryResult(QuerySearchResult result) { + numQueryResults.incrementAndGet(); + } + + @Override + public void onQueryFailure(int shardNumber, Exception exc) { + numQueryFailures.incrementAndGet(); + } + + @Override + public void onFetchResult(FetchSearchResult result) { + numFetchResults.incrementAndGet(); + } + + @Override + public void onFetchFailure(int shardNumber, Exception exc) { + numFetchFailures.incrementAndGet(); + } + + @Override + public void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { + numReduces.incrementAndGet(); + } + + @Override + public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) { + numReduces.incrementAndGet(); + } + + @Override + public void onResponse(SearchResponse response) { + searchResponse.set(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(); + } + }; + client.executeSearchLocally(request, listener); + latch.await(); + + assertThat(shardsListener.get(), equalTo(expectedShards)); + assertThat(numQueryResults.get(), equalTo(searchResponse.get().getSuccessfulShards())); + assertThat(numQueryFailures.get(), equalTo(searchResponse.get().getFailedShards())); + if (hasFetchPhase) { + assertThat(numFetchResults.get(), equalTo(searchResponse.get().getSuccessfulShards())); + assertThat(numFetchFailures.get(), equalTo(0)); + } else { + assertThat(numFetchResults.get(), equalTo(0)); + assertThat(numFetchFailures.get(), equalTo(0)); + } + assertThat(numReduces.get(), equalTo(searchResponse.get().getNumReducePhases())); + } + + private static List createRandomIndices(Client client) { + int numIndices = randomIntBetween(3, 20); + for (int i = 0; i < numIndices; i++) { + String indexName = String.format("index-%03d" , i); + assertAcked(client.admin().indices().prepareCreate(indexName).get()); + client.prepareIndex(indexName).setSource("number", i, "foo", "bar").get(); + } + client.admin().indices().prepareRefresh("index-*").get(); + ClusterSearchShardsResponse resp = client.admin().cluster().prepareSearchShards("index-*").get(); + return Arrays.stream(resp.getGroups()) + .map(e -> new SearchShard(null, e.getShardId())) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index a8f7e50845ace..955d775ecb8e6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -68,9 +68,11 @@ import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.action.search.SearchProgressListener.NOOP; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -339,7 +341,7 @@ public void testConsumer() { SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); + ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(NOOP, request, 3); assertEquals(0, reductions.size()); QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); @@ -402,7 +404,7 @@ public void testConsumerConcurrently() throws InterruptedException { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); Thread[] threads = new Thread[expectedNumResults]; for (int i = 0; i < expectedNumResults; i++) { @@ -449,7 +451,7 @@ public void testConsumerOnlyAggs() { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); for (int i = 0; i < expectedNumResults; i++) { int number = randomIntBetween(1, 1000); @@ -487,7 +489,7 @@ public void testConsumerOnlyHits() { } request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); for (int i = 0; i < expectedNumResults; i++) { int number = randomIntBetween(1, 1000); @@ -540,7 +542,7 @@ public void testNewSearchPhaseResults() { } request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer - = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + = searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) { assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); @@ -556,7 +558,7 @@ public void testReduceTopNWithFromOffset() { request.source(new SearchSourceBuilder().size(5).from(5)); request.setBatchedReduceSize(randomIntBetween(2, 4)); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, 4); + searchPhaseController.newSearchPhaseResults(NOOP, request, 4); int score = 100; for (int i = 0; i < 4; i++) { QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), @@ -592,7 +594,7 @@ public void testConsumerSortByField() { int size = randomIntBetween(1, 10); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); SortField[] sortFields = {new SortField("field", SortField.Type.INT, true)}; DocValueFormat[] docValueFormats = {DocValueFormat.RAW}; @@ -628,7 +630,7 @@ public void testConsumerFieldCollapsing() { int size = randomIntBetween(5, 10); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); SortField[] sortFields = {new SortField("field", SortField.Type.STRING)}; BytesRef a = new BytesRef("a"); BytesRef b = new BytesRef("b"); @@ -667,7 +669,7 @@ public void testConsumerSuggestions() { SearchRequest request = randomSearchRequest(); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); int maxScoreTerm = -1; int maxScorePhrase = -1; int maxScoreCompletion = -1; @@ -752,4 +754,87 @@ public void testConsumerSuggestions() { assertNull(reduce.sortedTopDocs.collapseField); assertNull(reduce.sortedTopDocs.collapseValues); } + + public void testProgressListener() throws InterruptedException { + int expectedNumResults = randomIntBetween(10, 100); + for (int bufferSize : new int[] {expectedNumResults, expectedNumResults/2, expectedNumResults/4, 2}) { + SearchRequest request = randomSearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); + request.setBatchedReduceSize(bufferSize); + AtomicInteger numQueryResultListener = new AtomicInteger(); + AtomicInteger numQueryFailureListener = new AtomicInteger(); + AtomicInteger numReduceListener = new AtomicInteger(); + AtomicReference finalAggsListener = new AtomicReference<>(); + AtomicReference totalHitsListener = new AtomicReference<>(); + SearchProgressListener progressListener = new SearchProgressListener() { + @Override + public void onQueryResult(QuerySearchResult result) { + numQueryResultListener.incrementAndGet(); + } + + @Override + public void onQueryFailure(int shardId, Exception exc) { + numQueryFailureListener.incrementAndGet(); + } + + @Override + public void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { + assertEquals(numReduceListener.incrementAndGet(), version); + } + + @Override + public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) { + totalHitsListener.set(totalHits); + finalAggsListener.set(aggs); + numReduceListener.incrementAndGet(); + } + }; + ArraySearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(progressListener, request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + Thread[] threads = new Thread[expectedNumResults]; + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + threads[i] = new Thread(() -> { + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new ShardId("a", "b", id), + null, OriginalIndices.NONE)); + result.topDocs(new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[]{new ScoreDoc(0, number)}), number), + new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, + DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + }); + threads[i].start(); + } + for (int i = 0; i < expectedNumResults; i++) { + threads[i].join(); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); + InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits.value); + assertEquals(max.get(), reduce.sortedTopDocs.scoreDocs[0].score, 0.0f); + assertFalse(reduce.sortedTopDocs.isSortedByField); + assertNull(reduce.sortedTopDocs.sortFields); + assertNull(reduce.sortedTopDocs.collapseField); + assertNull(reduce.sortedTopDocs.collapseValues); + + assertEquals(reduce.aggregations, finalAggsListener.get()); + assertEquals(reduce.totalHits, totalHitsListener.get()); + + assertEquals(expectedNumResults, numQueryResultListener.get()); + assertEquals(0, numQueryFailureListener.get()); + assertEquals(numReduceListener.get(), reduce.numReducePhases); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java index ffc41cab847b9..e16227c4ac6f8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -69,5 +69,4 @@ public void testShardCountLimit() throws Exception { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null))); } } - } From 6bc9f738b88b5fd541b63d0a4dbbb30e2d931073 Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 22 Nov 2019 01:09:24 +0100 Subject: [PATCH 04/12] iter --- .../org/elasticsearch/action/search/FetchSearchPhase.java | 1 - .../elasticsearch/action/search/FetchSearchPhaseTests.java | 1 - ...ssListenerIT.java => SearchProgressActionListenerIT.java} | 5 +++-- 3 files changed, 3 insertions(+), 4 deletions(-) rename server/src/test/java/org/elasticsearch/action/search/{SearchActionProgressListenerIT.java => SearchProgressActionListenerIT.java} (98%) diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 874c15ff83c80..78b2162a3978e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -34,7 +34,6 @@ import org.elasticsearch.transport.Transport; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.function.BiFunction; diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 6c9058812fd29..5eec29dbf8039 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -23,7 +23,6 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.shard.ShardId; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java b/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java similarity index 98% rename from server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java rename to server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java index 8ce6c3118cc99..dfa375dd27143 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchActionProgressListenerIT.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -46,7 +47,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -public class SearchActionProgressListenerIT extends ESSingleNodeTestCase { +public class SearchProgressActionListenerIT extends ESSingleNodeTestCase { private List shards; public void setUp() throws Exception { @@ -204,7 +205,7 @@ public void onFailure(Exception e) { private static List createRandomIndices(Client client) { int numIndices = randomIntBetween(3, 20); for (int i = 0; i < numIndices; i++) { - String indexName = String.format("index-%03d" , i); + String indexName = String.format(Locale.ROOT, "index-%03d" , i); assertAcked(client.admin().indices().prepareCreate(indexName).get()); client.prepareIndex(indexName).setSource("number", i, "foo", "bar").get(); } From e12834e095e0aab55876f2170e8569799b7509d3 Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 22 Nov 2019 08:36:53 +0100 Subject: [PATCH 05/12] fix npe in fetch phase --- .../elasticsearch/action/search/FetchSearchPhase.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 78b2162a3978e..dcdeafcfdf0cc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -135,11 +136,15 @@ private void innerRun() throws IOException { // we do this as we go since it will free up resources and passing on the request on the // transport layer is cheap. releaseIrrelevantSearchContext(queryResult.queryResult()); + + // empty result + FetchSearchResult result = new FetchSearchResult(); + result.setSearchShardTarget(queryResult.getSearchShardTarget()); + result.hits(SearchHits.empty()); + progressListener.onFetchResult(result); } // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); - // empty result - progressListener.onFetchResult(queryResult.fetchResult()); } else { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), From 2141172066e7e10d017e6b6c5ab6d1199f9d4cbc Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 22 Nov 2019 09:30:50 +0100 Subject: [PATCH 06/12] iter --- .../elasticsearch/action/search/SearchShard.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java index e840d31a76588..91dcb9f458708 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java @@ -19,24 +19,38 @@ package org.elasticsearch.action.search; +import org.elasticsearch.common.Nullable; import org.elasticsearch.index.shard.ShardId; import java.util.Comparator; import java.util.Objects; +/** + * A class that encapsulates the {@link ShardId} and the cluster alias + * of a shard used during the search action. + */ public class SearchShard implements Comparable { + @Nullable private final String clusterAlias; private final ShardId shardId; - SearchShard(String clusterAlias, ShardId shardId) { + SearchShard(@Nullable String clusterAlias, ShardId shardId) { this.clusterAlias = clusterAlias; this.shardId = shardId; } + /** + * Return the cluster alias if the shard is on a remote cluster and null + * otherwise (local). + */ public String getClusterAlias() { return clusterAlias; } + /** + * Return the {@link ShardId} of this shard. + */ + @Nullable public ShardId getShardId() { return shardId; } From bd3e09f94403a24e7f8569bae316397683c58683 Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 22 Nov 2019 10:36:19 +0100 Subject: [PATCH 07/12] iter --- .../action/search/DfsQueryPhase.java | 2 +- .../action/search/FetchSearchPhase.java | 6 +-- .../action/search/SearchPhaseController.java | 11 +++-- .../action/search/SearchProgressListener.java | 48 +++++++++++++++++++ .../SearchQueryThenFetchAsyncAction.java | 3 +- 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index a64816d29a5ef..ff766987f5464 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -73,7 +73,7 @@ public void run() throws IOException { resultList.size(), () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context); final SearchSourceBuilder sourceBuilder = context.getRequest().source(); - progressListener.onListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0); + progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0); for (final DfsSearchResult dfsResult : resultList) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index dcdeafcfdf0cc..543ccbf29e62b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -137,11 +137,11 @@ private void innerRun() throws IOException { // transport layer is cheap. releaseIrrelevantSearchContext(queryResult.queryResult()); - // empty result + // propagate empty result to the progress listener FetchSearchResult result = new FetchSearchResult(); result.setSearchShardTarget(queryResult.getSearchShardTarget()); result.hits(SearchHits.empty()); - progressListener.onFetchResult(result); + progressListener.notifyFetchResult(result); } // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); @@ -181,7 +181,7 @@ public void innerOnResponse(FetchSearchResult result) { context.onPhaseFailure(FetchSearchPhase.this, "", e); } if (success) { - progressListener.onFetchResult(result); + progressListener.notifyFetchResult(result); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index f630a6a100a03..17a3d4e3e99d9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -618,7 +618,7 @@ public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); consumeInternal(queryResult); - progressListener.onQueryResult(queryResult); + progressListener.notifyQueryResult(queryResult); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { @@ -639,7 +639,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { numReducePhases++; index = 1; if (hasAggs) { - progressListener.onPartialReduce(progressListener.searchShards(results.asList()), + progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()), topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); } } @@ -667,7 +667,8 @@ private synchronized List getRemainingTopDocs() { public ReducedQueryPhase reduce() { ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce); - progressListener.onReduce(progressListener.searchShards(results.asList()), reducePhase.totalHits, reducePhase.aggregations); + progressListener.notifyReduce(progressListener.searchShards(results.asList()), + reducePhase.totalHits, reducePhase.aggregations); return reducePhase; } @@ -713,7 +714,7 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL @Override void consumeResult(SearchPhaseResult result) { super.consumeResult(result); - listener.onQueryResult(result.queryResult()); + listener.notifyQueryResult(result.queryResult()); } @Override @@ -727,7 +728,7 @@ ReducedQueryPhase reduce() { List resultList = results.asList(); final ReducedQueryPhase reducePhase = reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce()); - listener.onReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations); + listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations); return reducePhase; } }; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index 1ff58a09fa1f4..d44e911d6358a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -19,6 +19,9 @@ package org.elasticsearch.action.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.search.TotalHits; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.search.SearchPhaseResult; @@ -35,6 +38,8 @@ * A listener that allows to track progress of the {@link SearchAction}. */ abstract class SearchProgressListener { + private static final Logger logger = LogManager.getLogger(SearchProgressListener.class); + /** * Executed when shards are ready to be queried. * @@ -93,6 +98,49 @@ public void onFetchResult(FetchSearchResult result) {} */ public void onFetchFailure(int shardIndex, Exception exc) {} + final void notifyListShards(List shards, boolean fetchPhase) { + try { + onListShards(shards, fetchPhase); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards", e)); + } + } + + final void notifyQueryResult(QuerySearchResult result) { + try { + onQueryResult(result); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query result", + result.getSearchShardTarget().getShardId().getId()), e); + } + } + + final void notifyPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { + try { + onPartialReduce(shards, totalHits, aggs, version); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce", e)); + } + } + + final void notifyReduce(List shards, TotalHits totalHits, InternalAggregations aggs) { + try { + onReduce(shards, totalHits, aggs); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce", e)); + } + } + + public void notifyFetchResult(FetchSearchResult result) { + try { + onFetchResult(result); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on fetch result", + result.getSearchShardTarget().getShardId().getId()), e); + } + } + + final List searchShards(List results) { return results.stream() .filter(Objects::nonNull) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 6ace48a7fb551..b2c9039c427b7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -51,7 +51,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction Date: Fri, 22 Nov 2019 11:12:27 +0100 Subject: [PATCH 08/12] fix message --- .../elasticsearch/action/search/SearchProgressListener.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index d44e911d6358a..af74d6d11af01 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -102,7 +102,7 @@ final void notifyListShards(List shards, boolean fetchPhase) { try { onListShards(shards, fetchPhase); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards", e)); + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards"), e); } } @@ -119,7 +119,7 @@ final void notifyPartialReduce(List shards, TotalHits totalHits, In try { onPartialReduce(shards, totalHits, aggs, version); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce", e)); + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce"), e); } } @@ -127,7 +127,7 @@ final void notifyReduce(List shards, TotalHits totalHits, InternalA try { onReduce(shards, totalHits, aggs); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce", e)); + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e); } } From 3c85aa9e86f4cc9ea1951659465bf9a673b0c06c Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 25 Nov 2019 11:35:07 +0100 Subject: [PATCH 09/12] iter --- .../action/search/SearchProgressListener.java | 7 +++---- .../java/org/elasticsearch/action/search/SearchShard.java | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index af74d6d11af01..38dae4697ebda 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -40,6 +40,8 @@ abstract class SearchProgressListener { private static final Logger logger = LogManager.getLogger(SearchProgressListener.class); + public static final SearchProgressListener NOOP = new SearchProgressListener() {}; + /** * Executed when shards are ready to be queried. * @@ -131,7 +133,7 @@ final void notifyReduce(List shards, TotalHits totalHits, InternalA } } - public void notifyFetchResult(FetchSearchResult result) { + final void notifyFetchResult(FetchSearchResult result) { try { onFetchResult(result); } catch (Exception e) { @@ -140,7 +142,6 @@ public void notifyFetchResult(FetchSearchResult result) { } } - final List searchShards(List results) { return results.stream() .filter(Objects::nonNull) @@ -154,6 +155,4 @@ final List searchShards(GroupShardsIterator it .map(e -> new SearchShard(e.getClusterAlias(), e.shardId())) .collect(Collectors.toList()); } - - public static final SearchProgressListener NOOP = new SearchProgressListener() {}; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java index 91dcb9f458708..ff60d2f52e466 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java @@ -57,8 +57,7 @@ public ShardId getShardId() { @Override public int compareTo(SearchShard o) { - int cmp = Objects.compare(clusterAlias, o.clusterAlias, - Comparator.nullsFirst(Comparator.naturalOrder())); + int cmp = Objects.compare(clusterAlias, o.clusterAlias, Comparator.nullsFirst(Comparator.naturalOrder())); return cmp != 0 ? cmp : shardId.compareTo(o.shardId); } @@ -67,8 +66,8 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SearchShard that = (SearchShard) o; - return Objects.equals(clusterAlias, that.clusterAlias) && - shardId.equals(that.shardId); + return Objects.equals(clusterAlias, that.clusterAlias) + && shardId.equals(that.shardId); } @Override From 8b67014aac44fe47384294223a8ac67ac1df5113 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 25 Nov 2019 15:38:53 +0100 Subject: [PATCH 10/12] Ensure fetch results are reported before counting down the requests --- .../action/search/FetchSearchPhase.java | 4 +-- .../action/search/SearchPhaseController.java | 4 +-- .../action/search/SearchProgressListener.java | 25 ++++++++++--------- .../search/SearchPhaseControllerTests.java | 7 ++++-- .../SearchProgressActionListenerIT.java | 15 ++++++----- 5 files changed, 31 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 543ccbf29e62b..3ce4372dfd7ec 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -141,8 +141,8 @@ private void innerRun() throws IOException { FetchSearchResult result = new FetchSearchResult(); result.setSearchShardTarget(queryResult.getSearchShardTarget()); result.hits(SearchHits.empty()); - progressListener.notifyFetchResult(result); } + progressListener.notifyFetchResult(i); // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); } else { @@ -181,7 +181,7 @@ public void innerOnResponse(FetchSearchResult result) { context.onPhaseFailure(FetchSearchPhase.this, "", e); } if (success) { - progressListener.notifyFetchResult(result); + progressListener.notifyFetchResult(shardIndex); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 17a3d4e3e99d9..a423ead7c9408 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -618,7 +618,7 @@ public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); consumeInternal(queryResult); - progressListener.notifyQueryResult(queryResult); + progressListener.notifyQueryResult(queryResult.getShardIndex()); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { @@ -714,7 +714,7 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL @Override void consumeResult(SearchPhaseResult result) { super.consumeResult(result); - listener.notifyQueryResult(result.queryResult()); + listener.notifyQueryResult(result.queryResult().getShardIndex()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index 38dae4697ebda..95328fd08b486 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -26,8 +26,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.query.QuerySearchResult; import java.util.List; import java.util.Objects; @@ -42,6 +40,8 @@ abstract class SearchProgressListener { public static final SearchProgressListener NOOP = new SearchProgressListener() {}; + private List shards; + /** * Executed when shards are ready to be queried. * @@ -53,9 +53,9 @@ public void onListShards(List shards, boolean fetchPhase) {} /** * Executed when a shard returns a query result. * - * @param result The query result. + * @param shardIndex The index of the shard in the list provided by onListShards. */ - public void onQueryResult(QuerySearchResult result) {} + public void onQueryResult(int shardIndex) {} /** * Executed when a shard reports a query failure. @@ -88,9 +88,9 @@ public void onReduce(List shards, TotalHits totalHits, InternalAggr /** * Executed when a shard returns a query result. * - * @param result The fetch result. + * @param shardIndex The index of the shard in the list provided by onListShards. */ - public void onFetchResult(FetchSearchResult result) {} + public void onFetchResult(int shardIndex) {} /** * Executed when a shard reports a fetch failure. @@ -101,6 +101,7 @@ public void onFetchResult(FetchSearchResult result) {} public void onFetchFailure(int shardIndex, Exception exc) {} final void notifyListShards(List shards, boolean fetchPhase) { + this.shards = shards; try { onListShards(shards, fetchPhase); } catch (Exception e) { @@ -108,12 +109,12 @@ final void notifyListShards(List shards, boolean fetchPhase) { } } - final void notifyQueryResult(QuerySearchResult result) { + final void notifyQueryResult(int shardIndex) { try { - onQueryResult(result); + onQueryResult(shardIndex); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query result", - result.getSearchShardTarget().getShardId().getId()), e); + shards.get(shardIndex)), e); } } @@ -133,12 +134,12 @@ final void notifyReduce(List shards, TotalHits totalHits, InternalA } } - final void notifyFetchResult(FetchSearchResult result) { + final void notifyFetchResult(int shardIndex) { try { - onFetchResult(result); + onFetchResult(shardIndex); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on fetch result", - result.getSearchShardTarget().getShardId().getId()), e); + shards.get(shardIndex)), e); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 955d775ecb8e6..641d5bf2c59b4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -76,6 +76,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -768,12 +769,14 @@ public void testProgressListener() throws InterruptedException { AtomicReference totalHitsListener = new AtomicReference<>(); SearchProgressListener progressListener = new SearchProgressListener() { @Override - public void onQueryResult(QuerySearchResult result) { + public void onQueryResult(int shardIndex) { + assertThat(shardIndex, lessThan(expectedNumResults)); numQueryResultListener.incrementAndGet(); } @Override - public void onQueryFailure(int shardId, Exception exc) { + public void onQueryFailure(int shardIndex, Exception exc) { + assertThat(shardIndex, lessThan(expectedNumResults)); numQueryFailureListener.incrementAndGet(); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java b/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java index dfa375dd27143..1a6bb6263fb6b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java @@ -27,8 +27,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -46,6 +44,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; public class SearchProgressActionListenerIT extends ESSingleNodeTestCase { private List shards; @@ -146,22 +145,26 @@ public void onListShards(List shards, boolean fetchPhase) { } @Override - public void onQueryResult(QuerySearchResult result) { + public void onQueryResult(int shardIndex) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); numQueryResults.incrementAndGet(); } @Override - public void onQueryFailure(int shardNumber, Exception exc) { + public void onQueryFailure(int shardIndex, Exception exc) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); numQueryFailures.incrementAndGet(); } @Override - public void onFetchResult(FetchSearchResult result) { + public void onFetchResult(int shardIndex) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); numFetchResults.incrementAndGet(); } @Override - public void onFetchFailure(int shardNumber, Exception exc) { + public void onFetchFailure(int shardIndex, Exception exc) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); numFetchFailures.incrementAndGet(); } From 755bab2288813016e82aacd32a964d22177f7aae Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 27 Nov 2019 14:01:46 +0100 Subject: [PATCH 11/12] make sure we call onQueryFetchFailure once per shard group --- .../search/AbstractSearchAsyncAction.java | 15 ++++++++-- .../search/CanMatchPreFilterSearchPhase.java | 2 +- .../action/search/DfsQueryPhase.java | 1 + .../action/search/FetchSearchPhase.java | 16 ++-------- .../action/search/SearchPhaseController.java | 13 ++------ .../action/search/SearchPhaseResults.java | 2 +- .../action/search/SearchProgressListener.java | 30 +++++++++++++++---- .../SearchQueryThenFetchAsyncAction.java | 7 +++++ .../action/search/SearchShard.java | 1 + 9 files changed, 53 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 7a7b6ae7df43d..ca68bb4008146 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; @@ -381,6 +382,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e); } } + onShardGroupFailure(shardIndex, e); onPhaseDone(); } else { final ShardRouting nextShard = shardIt.nextOrNull(); @@ -389,7 +391,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, logger.trace(() -> new ParameterizedMessage( "{}: Failed to execute [{}] lastShard [{}]", shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e); - if (!lastShard) { + if (lastShard == false) { performPhaseOnShard(shardIndex, shardIt, nextShard); } else { // no more shards active, add a failure @@ -400,10 +402,19 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e); } } + onShardGroupFailure(shardIndex, e); } } } + /** + * Executed once for every {@link ShardId} that failed on all available shard routing. + * + * @param shardIndex the shard target that failed + * @param exc the final failure reason + */ + protected void onShardGroupFailure(int shardIndex, Exception exc) {} + /** * Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given * shard target. @@ -443,7 +454,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter } } - results.consumeShardFailure(shardIndex, e); + results.consumeShardFailure(shardIndex); } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 1b1cd8efdb0d5..aba32d2c850a0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -159,7 +159,7 @@ boolean hasResult(int shardIndex) { } @Override - void consumeShardFailure(int shardIndex, Exception exc) { + void consumeShardFailure(int shardIndex) { // we have to carry over shard failures in order to account for them in the response. consumeResult(shardIndex, true, null); } diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index ff766987f5464..b4d52fa418e10 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -97,6 +97,7 @@ public void onFailure(Exception exception) { try { context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase", querySearchRequest.id()), exception); + progressListener.notifyQueryFailure(shardIndex, exception); counter.onFailure(shardIndex, searchShardTarget, exception); } finally { // the query might not have been executed at all (for example because thread pool rejected diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 3ce4372dfd7ec..41d216072e4b2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -136,13 +135,8 @@ private void innerRun() throws IOException { // we do this as we go since it will free up resources and passing on the request on the // transport layer is cheap. releaseIrrelevantSearchContext(queryResult.queryResult()); - - // propagate empty result to the progress listener - FetchSearchResult result = new FetchSearchResult(); - result.setSearchShardTarget(queryResult.getSearchShardTarget()); - result.hits(SearchHits.empty()); + progressListener.notifyFetchResult(i); } - progressListener.notifyFetchResult(i); // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); } else { @@ -173,22 +167,19 @@ private void executeFetch(final int shardIndex, final SearchShardTarget shardTar new SearchActionListener(shardTarget, shardIndex) { @Override public void innerOnResponse(FetchSearchResult result) { - boolean success = false; try { + progressListener.notifyFetchResult(shardIndex); counter.onResult(result); - success = true; } catch (Exception e) { context.onPhaseFailure(FetchSearchPhase.this, "", e); } - if (success) { - progressListener.notifyFetchResult(shardIndex); - } } @Override public void onFailure(Exception e) { try { logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e); + progressListener.notifyFetchFailure(shardIndex, e); counter.onFailure(shardIndex, shardTarget, e); } finally { // the search context might not be cleared on the node where the fetch was executed for example @@ -196,7 +187,6 @@ public void onFailure(Exception e) { // request to clear the search context. releaseIrrelevantSearchContext(querySearchResult); } - progressListener.onFetchFailure(shardIndex, e); } }); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index a423ead7c9408..b3562cc002b5b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -578,6 +578,8 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults resultList = results.asList(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java index f336aa6d317b3..e81cf4b74e234 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseResults.java @@ -56,7 +56,7 @@ final int getNumShards() { */ abstract boolean hasResult(int shardIndex); - void consumeShardFailure(int shardIndex, Exception exc) {} + void consumeShardFailure(int shardIndex) {} AtomicArray getAtomicArray() { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index 95328fd08b486..aea232066d2f6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -53,17 +53,17 @@ public void onListShards(List shards, boolean fetchPhase) {} /** * Executed when a shard returns a query result. * - * @param shardIndex The index of the shard in the list provided by onListShards. + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)} )}. */ public void onQueryResult(int shardIndex) {} /** * Executed when a shard reports a query failure. * - * @param shardIndex The index of the shard in the list provided by onListShards. + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}. * @param exc The cause of the failure. */ - public void onQueryFailure(int shardIndex, Exception exc) { } + public void onQueryFailure(int shardIndex, Exception exc) {} /** * Executed when a partial reduce is created. The number of partial reduce can be controlled via @@ -86,16 +86,16 @@ public void onPartialReduce(List shards, TotalHits totalHits, Inter public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) {} /** - * Executed when a shard returns a query result. + * Executed when a shard returns a fetch result. * - * @param shardIndex The index of the shard in the list provided by onListShards. + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}. */ public void onFetchResult(int shardIndex) {} /** * Executed when a shard reports a fetch failure. * - * @param shardIndex The index of the shard in the list provided by onListShards. + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}. * @param exc The cause of the failure. */ public void onFetchFailure(int shardIndex, Exception exc) {} @@ -118,6 +118,15 @@ final void notifyQueryResult(int shardIndex) { } } + final void notifyQueryFailure(int shardIndex, Exception exc) { + try { + onQueryFailure(shardIndex, exc); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query failure", + shards.get(shardIndex)), e); + } + } + final void notifyPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { try { onPartialReduce(shards, totalHits, aggs, version); @@ -143,6 +152,15 @@ final void notifyFetchResult(int shardIndex) { } } + final void notifyFetchFailure(int shardIndex, Exception exc) { + try { + onFetchFailure(shardIndex, exc); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on fetch failure", + shards.get(shardIndex)), e); + } + } + final List searchShards(List results) { return results.stream() .filter(Objects::nonNull) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index b2c9039c427b7..d5060b728347d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -36,6 +36,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; + private final SearchProgressListener progressListener; SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, final BiFunction nodeIdToConnection, final Map aliasFilter, @@ -49,6 +50,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction results, final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java index ff60d2f52e466..16459d81885ce 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java @@ -43,6 +43,7 @@ public class SearchShard implements Comparable { * Return the cluster alias if the shard is on a remote cluster and null * otherwise (local). */ + @Nullable public String getClusterAlias() { return clusterAlias; } From 4120a78db2aab85c25429c3c6d51dd08703ea889 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 27 Nov 2019 14:03:11 +0100 Subject: [PATCH 12/12] address comment --- .../elasticsearch/action/search/SearchProgressListener.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index aea232066d2f6..c5b3d35159491 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -166,12 +166,12 @@ final List searchShards(List results) .filter(Objects::nonNull) .map(SearchPhaseResult::getSearchShardTarget) .map(e -> new SearchShard(e.getClusterAlias(), e.getShardId())) - .collect(Collectors.toList()); + .collect(Collectors.toUnmodifiableList()); } final List searchShards(GroupShardsIterator its) { return StreamSupport.stream(its.spliterator(), false) .map(e -> new SearchShard(e.getClusterAlias(), e.shardId())) - .collect(Collectors.toList()); + .collect(Collectors.toUnmodifiableList()); } }