From b555eca3ea46bf8eab8aaf2dbd9454fe7bf9e4b9 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 10 Jul 2017 16:53:12 +0200 Subject: [PATCH 1/7] Limit the number of concurrent shard requests per search request This is a protection mechanism to prevent a single search request from hitting a large number of shards in the cluster concurrently. If a search is executed against all indices in the cluster this can easily overload the cluster causing rejections etc. which is not necessarily desirable. Instead this PR adds a per request limit of `max_concurrent_shard_requests` that throttles the number of concurrent initial phase requests to `256` by default. This limit can be increased per request and protects single search requests from overloading the cluster. Subsequent PRs can introduces addiontional improvemetns ie. limiting this on a `_msearch` level, making defaults a factor of the number of nodes or sort shards iters such that we gain the best concurrency across nodes. --- .../action/search/InitialSearchPhase.java | 37 ++++--- .../action/search/SearchRequest.java | 38 ++++++- .../action/search/SearchRequestBuilder.java | 10 ++ .../cluster/routing/GroupShardsIterator.java | 4 + .../rest/action/search/RestSearchAction.java | 3 + .../action/search/SearchAsyncActionTests.java | 99 +++++++++++++++++++ .../resources/rest-api-spec/api/search.json | 5 + .../test/client/RandomizingClient.java | 4 +- 8 files changed, 186 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index de58b1906427f..e77a5c6ba42b1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -33,6 +33,9 @@ import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -50,6 +53,10 @@ abstract class InitialSearchPhase extends private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); + private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); + private final int concurrentRunnables; + + InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { super(name); @@ -61,6 +68,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); + concurrentRunnables = Math.min(request.getMaxNumConcurrentShardRequests(), shardsIts.size()); } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, @@ -105,6 +113,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner); } } else { + maybeExecuteNext(); // move to the next execution if needed // no more shards active, add a failure if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception if (e != null && !TransportActions.isShardNotAvailableException(e)) { @@ -124,23 +133,26 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Override public final void run() throws IOException { - int shardIndex = -1; - for (final SearchShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.nextOrNull(); - if (shard != null) { - performPhaseOnShard(shardIndex, shardIt, shard); - } else { - // really, no shards active in this group - onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } + boolean success = shardExecutionIndex.compareAndSet(0, concurrentRunnables); + assert success; + for (int i = 0; i < concurrentRunnables; i++) { + int index = i; + SearchShardIterator shardRoutings = shardsIts.get(index); + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); } } + private void maybeExecuteNext() { + final int index = shardExecutionIndex.getAndIncrement(); + if (index < shardsIts.size()) { + SearchShardIterator shardRoutings = shardsIts.get(index); + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); + } + } + + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { if (shard == null) { - // TODO upgrade this to an assert... - // no more active shards... (we should not really get here, but just for safety) onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { try { @@ -166,6 +178,7 @@ public void onFailure(Exception t) { } private void onShardResult(FirstResult result, ShardIterator shardIt) { + maybeExecuteNext(); assert result.getShardIndex() != -1 : "shard index is not set"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; onShardSuccess(result); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 01a3e94620a46..10b2ac9971565 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -74,6 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private int batchedReduceSize = 512; + private int maxNunConcurrentShardRequests = 256; + private String[] types = Strings.EMPTY_ARRAY; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -302,6 +305,27 @@ public int getBatchedReduceSize() { return batchedReduceSize; } + /** + * Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to + * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default is 256 + */ + public int getMaxNumConcurrentShardRequests() { + return maxNunConcurrentShardRequests; + } + + /** + * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to + * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default is 256 + */ + public void setMaxNumConcurrentShardRequests(int maxNunConcurrentShardRequests) { + if (maxNunConcurrentShardRequests < 1) { + throw new IllegalArgumentException("maxNunConcurrentShardRequests must be >= 1"); + } + this.maxNunConcurrentShardRequests = maxNunConcurrentShardRequests; + } + /** * @return true if the request only has suggest */ @@ -349,6 +373,11 @@ public void readFrom(StreamInput in) throws IOException { indicesOptions = IndicesOptions.readIndicesOptions(in); requestCache = in.readOptionalBoolean(); batchedReduceSize = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + maxNunConcurrentShardRequests = in.readVInt(); + } + + } @Override @@ -367,6 +396,9 @@ public void writeTo(StreamOutput out) throws IOException { indicesOptions.writeIndicesOptions(out); out.writeOptionalBoolean(requestCache); out.writeVInt(batchedReduceSize); + if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + out.writeVInt(maxNunConcurrentShardRequests); + } } @Override @@ -386,13 +418,15 @@ public boolean equals(Object o) { Objects.equals(requestCache, that.requestCache) && Objects.equals(scroll, that.scroll) && Arrays.equals(types, that.types) && + Objects.equals(batchedReduceSize, that.batchedReduceSize) && + Objects.equals(maxNunConcurrentShardRequests, that.maxNunConcurrentShardRequests) && Objects.equals(indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - scroll, Arrays.hashCode(types), indicesOptions); + scroll, Arrays.hashCode(types), indicesOptions, maxNunConcurrentShardRequests); } @Override @@ -406,6 +440,8 @@ public String toString() { ", preference='" + preference + '\'' + ", requestCache=" + requestCache + ", scroll=" + scroll + + ", maxNunConcurrentShardRequests=" + maxNunConcurrentShardRequests + + ", batchedReduceSize=" + batchedReduceSize + ", source=" + source + '}'; } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 8b9879b2fa2f1..9f93dfbb4ddad 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -525,4 +525,14 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) { this.request.setBatchedReduceSize(batchedReduceSize); return this; } + + /** + * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to + * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default is 256 + */ + public SearchRequestBuilder setMaxConcurrentShardRequests(int maxNunConcurrentShardRequests) { + this.request.setMaxNumConcurrentShardRequests(maxNunConcurrentShardRequests); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java b/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java index 7b33c24d15ffd..21b02043a2249 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java @@ -78,4 +78,8 @@ public int size() { public Iterator iterator() { return iterators.iterator(); } + + public ShardIt get(int index) { + return iterators.get(index); + } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 3f04603c2da1b..ab34a7318f207 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -99,6 +99,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); searchRequest.setBatchedReduceSize(batchedReduceSize); + final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", searchRequest.getBatchedReduceSize()); + searchRequest.setMaxNumConcurrentShardRequests(maxConcurrentShardRequests); + // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types // from the REST layer. these modes are an internal optimization and should // not be specified explicitly by the user. diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 878cb7e61266b..8fad93984bde5 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -48,14 +48,113 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class SearchAsyncActionTests extends ESTestCase { + public void testLimitConcurrentShardRequests() throws InterruptedException { + SearchRequest request = new SearchRequest(); + int numConcurrent = randomIntBetween(1, 5); + request.setMaxNumConcurrentShardRequests(numConcurrent); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference response = new AtomicReference<>(); + ActionListener responseListener = new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + response.set((TestSearchResponse) searchResponse); + } + + @Override + public void onFailure(Exception e) { + logger.warn("test failed", e); + fail(e.getMessage()); + } + }; + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + + AtomicInteger contextIdGenerator = new AtomicInteger(0); + GroupShardsIterator shardsIter = getShardsIter("idx", + new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), + 10, randomBoolean(), primaryNode, replicaNode); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null); + Map lookup = new HashMap<>(); + Map seenShard = new ConcurrentHashMap<>(); + lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); + lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); + Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); + CountDownLatch awaitInitialRequests = new CountDownLatch(1); + AtomicInteger numRequests = new AtomicInteger(0); + AtomicInteger numResponses = new AtomicInteger(0); + AbstractSearchAsyncAction asyncAction = + new AbstractSearchAsyncAction( + "test", + logger, + transportService, + (cluster, node) -> { + assert cluster == null : "cluster was not null: " + cluster; + return lookup.get(node); }, + aliasFilters, + Collections.emptyMap(), + null, + request, + responseListener, + shardsIter, + new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), + 0, + null, + new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) { + + @Override + protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + SearchActionListener listener) { + seenShard.computeIfAbsent(shard.shardId(), (i) -> { + numRequests.incrementAndGet(); // only count this once per replica + return Boolean.TRUE; + }); + + new Thread(() -> { + try { + awaitInitialRequests.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + Transport.Connection connection = getConnection(null, shard.currentNodeId()); + TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), + connection.getNode()); + if (numResponses.getAndIncrement() > 0 && randomBoolean()) { // at least one response otherwise the entire + // request fails + listener.onFailure(new RuntimeException()); + } else { + listener.onResponse(testSearchPhaseResult); + } + + }).start(); + } + + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() throws IOException { + latch.countDown(); + } + }; + } + }; + asyncAction.start(); + assertEquals(numConcurrent, numRequests.get()); + awaitInitialRequests.countDown(); + latch.await(); + assertEquals(10, numRequests.get()); + } + public void testFanOutAndCollect() throws InterruptedException { SearchRequest request = new SearchRequest(); + request.setMaxNumConcurrentShardRequests(randomIntBetween(1, 100)); CountDownLatch latch = new CountDownLatch(1); AtomicReference response = new AtomicReference<>(); ActionListener responseListener = new ActionListener() { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 5b4b9b681d5ab..3caa0d1d29cb8 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -163,6 +163,11 @@ "type" : "number", "description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.", "default" : 512 + }, + "max_concurrent_shard_requests" : { + "type" : "number", + "description" : "The number of concurrent shard requests this search executes concurrently. This values should be used to limitthe impact of the search on the cluster in order to limit the number of concurrent shard requests", + "default" : 256 } } }, diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java index a7d9a72e6b77c..71c5c271815d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java @@ -37,6 +37,7 @@ public class RandomizingClient extends FilterClient { private final SearchType defaultSearchType; private final String defaultPreference; private final int batchedReduceSize; + private final int maxConcurrentRequests; public RandomizingClient(Client client, Random random) { @@ -55,13 +56,14 @@ public RandomizingClient(Client client, Random random) { defaultPreference = null; } this.batchedReduceSize = 2 + random.nextInt(10); + this.maxConcurrentRequests = 1 + random.nextInt(100); } @Override public SearchRequestBuilder prepareSearch(String... indices) { return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference) - .setBatchedReduceSize(batchedReduceSize); + .setBatchedReduceSize(batchedReduceSize).setMaxConcurrentShardRequests(maxConcurrentRequests); } @Override From 837548904ed2d6fcd73c30d5d58738da88e423d4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 10 Jul 2017 17:02:03 +0200 Subject: [PATCH 2/7] org imports: --- .../org/elasticsearch/action/search/InitialSearchPhase.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index e77a5c6ba42b1..5f2c0809105cb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -33,9 +33,6 @@ import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -56,8 +53,6 @@ abstract class InitialSearchPhase extends private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); private final int concurrentRunnables; - - InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { super(name); this.request = request; From f725e3403b77849db71a680e4676874d3d75ea50 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 11 Jul 2017 08:03:11 +0200 Subject: [PATCH 3/7] apply feedback --- .../action/search/InitialSearchPhase.java | 7 +++--- .../action/search/SearchRequest.java | 24 +++++++++---------- .../action/search/SearchRequestBuilder.java | 2 +- .../rest/action/search/RestSearchAction.java | 5 ++-- .../action/search/SearchAsyncActionTests.java | 4 ++-- .../resources/rest-api-spec/api/search.json | 3 ++- .../test/client/RandomizingClient.java | 6 ++--- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 5f2c0809105cb..5fcd8833df423 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -63,7 +63,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - concurrentRunnables = Math.min(request.getMaxNumConcurrentShardRequests(), shardsIts.size()); + concurrentRunnables = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size()); } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, @@ -131,9 +131,8 @@ public final void run() throws IOException { boolean success = shardExecutionIndex.compareAndSet(0, concurrentRunnables); assert success; for (int i = 0; i < concurrentRunnables; i++) { - int index = i; - SearchShardIterator shardRoutings = shardsIts.get(index); - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); + SearchShardIterator shardRoutings = shardsIts.get(i); + performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull()); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 10b2ac9971565..53eb005345791 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -75,7 +75,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private int batchedReduceSize = 512; - private int maxNunConcurrentShardRequests = 256; + private int maxConcurrentShardRequests = 256; private String[] types = Strings.EMPTY_ARRAY; @@ -310,8 +310,8 @@ public int getBatchedReduceSize() { * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled * with this number to reduce the cluster load. The default is 256 */ - public int getMaxNumConcurrentShardRequests() { - return maxNunConcurrentShardRequests; + public int getMaxConcurrentShardRequests() { + return maxConcurrentShardRequests; } /** @@ -319,11 +319,11 @@ public int getMaxNumConcurrentShardRequests() { * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled * with this number to reduce the cluster load. The default is 256 */ - public void setMaxNumConcurrentShardRequests(int maxNunConcurrentShardRequests) { - if (maxNunConcurrentShardRequests < 1) { - throw new IllegalArgumentException("maxNunConcurrentShardRequests must be >= 1"); + public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { + if (maxConcurrentShardRequests < 1) { + throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1"); } - this.maxNunConcurrentShardRequests = maxNunConcurrentShardRequests; + this.maxConcurrentShardRequests = maxConcurrentShardRequests; } /** @@ -374,7 +374,7 @@ public void readFrom(StreamInput in) throws IOException { requestCache = in.readOptionalBoolean(); batchedReduceSize = in.readVInt(); if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { - maxNunConcurrentShardRequests = in.readVInt(); + maxConcurrentShardRequests = in.readVInt(); } @@ -397,7 +397,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(requestCache); out.writeVInt(batchedReduceSize); if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { - out.writeVInt(maxNunConcurrentShardRequests); + out.writeVInt(maxConcurrentShardRequests); } } @@ -419,14 +419,14 @@ public boolean equals(Object o) { Objects.equals(scroll, that.scroll) && Arrays.equals(types, that.types) && Objects.equals(batchedReduceSize, that.batchedReduceSize) && - Objects.equals(maxNunConcurrentShardRequests, that.maxNunConcurrentShardRequests) && + Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) && Objects.equals(indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - scroll, Arrays.hashCode(types), indicesOptions, maxNunConcurrentShardRequests); + scroll, Arrays.hashCode(types), indicesOptions, maxConcurrentShardRequests); } @Override @@ -440,7 +440,7 @@ public String toString() { ", preference='" + preference + '\'' + ", requestCache=" + requestCache + ", scroll=" + scroll + - ", maxNunConcurrentShardRequests=" + maxNunConcurrentShardRequests + + ", maxConcurrentShardRequests=" + maxConcurrentShardRequests + ", batchedReduceSize=" + batchedReduceSize + ", source=" + source + '}'; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 9f93dfbb4ddad..b74599ddaf915 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -532,7 +532,7 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) { * with this number to reduce the cluster load. The default is 256 */ public SearchRequestBuilder setMaxConcurrentShardRequests(int maxNunConcurrentShardRequests) { - this.request.setMaxNumConcurrentShardRequests(maxNunConcurrentShardRequests); + this.request.setMaxConcurrentShardRequests(maxNunConcurrentShardRequests); return this; } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index ab34a7318f207..fb4a2303ba014 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -99,8 +99,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); searchRequest.setBatchedReduceSize(batchedReduceSize); - final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", searchRequest.getBatchedReduceSize()); - searchRequest.setMaxNumConcurrentShardRequests(maxConcurrentShardRequests); + final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", + searchRequest.getMaxConcurrentShardRequests()); + searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests); // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types // from the REST layer. these modes are an internal optimization and should diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 8fad93984bde5..b0bf4cc6626fd 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -58,7 +58,7 @@ public class SearchAsyncActionTests extends ESTestCase { public void testLimitConcurrentShardRequests() throws InterruptedException { SearchRequest request = new SearchRequest(); int numConcurrent = randomIntBetween(1, 5); - request.setMaxNumConcurrentShardRequests(numConcurrent); + request.setMaxConcurrentShardRequests(numConcurrent); CountDownLatch latch = new CountDownLatch(1); AtomicReference response = new AtomicReference<>(); ActionListener responseListener = new ActionListener() { @@ -154,7 +154,7 @@ public void run() throws IOException { public void testFanOutAndCollect() throws InterruptedException { SearchRequest request = new SearchRequest(); - request.setMaxNumConcurrentShardRequests(randomIntBetween(1, 100)); + request.setMaxConcurrentShardRequests(randomIntBetween(1, 100)); CountDownLatch latch = new CountDownLatch(1); AtomicReference response = new AtomicReference<>(); ActionListener responseListener = new ActionListener() { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 3caa0d1d29cb8..b00bc821040a1 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -166,7 +166,8 @@ }, "max_concurrent_shard_requests" : { "type" : "number", - "description" : "The number of concurrent shard requests this search executes concurrently. This values should be used to limitthe impact of the search on the cluster in order to limit the number of concurrent shard requests", + "description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", + "default" : 256 } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java index 71c5c271815d5..b9f3ce3b68209 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java @@ -37,7 +37,7 @@ public class RandomizingClient extends FilterClient { private final SearchType defaultSearchType; private final String defaultPreference; private final int batchedReduceSize; - private final int maxConcurrentRequests; + private final int maxConcurrentShardRequests; public RandomizingClient(Client client, Random random) { @@ -56,14 +56,14 @@ public RandomizingClient(Client client, Random random) { defaultPreference = null; } this.batchedReduceSize = 2 + random.nextInt(10); - this.maxConcurrentRequests = 1 + random.nextInt(100); + this.maxConcurrentShardRequests = 1 + random.nextInt(1 << random.nextInt(8)); } @Override public SearchRequestBuilder prepareSearch(String... indices) { return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference) - .setBatchedReduceSize(batchedReduceSize).setMaxConcurrentShardRequests(maxConcurrentRequests); + .setBatchedReduceSize(batchedReduceSize).setMaxConcurrentShardRequests(maxConcurrentShardRequests); } @Override From 51f9653e69325ff388ba484941662e3ad7dd7084 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 11 Jul 2017 08:04:02 +0200 Subject: [PATCH 4/7] fix spelling --- .../org/elasticsearch/action/search/SearchRequestBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index b74599ddaf915..6e7034f1813d7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -531,8 +531,8 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) { * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled * with this number to reduce the cluster load. The default is 256 */ - public SearchRequestBuilder setMaxConcurrentShardRequests(int maxNunConcurrentShardRequests) { - this.request.setMaxConcurrentShardRequests(maxNunConcurrentShardRequests); + public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { + this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); return this; } } From 1089012e986eeade5a130c9796f969759b32bce2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 11 Jul 2017 11:17:22 +0200 Subject: [PATCH 5/7] make default a function of nodes and document stuff --- .../action/search/SearchRequest.java | 17 +++++++++++----- .../action/search/SearchRequestBuilder.java | 4 ++-- .../action/search/TransportSearchAction.java | 20 +++++++++++++++---- .../rest/action/search/RestSearchAction.java | 10 +++++++--- docs/reference/search/search.asciidoc | 7 +++++++ .../resources/rest-api-spec/api/search.json | 3 +-- .../test/client/RandomizingClient.java | 15 ++++++++++---- 7 files changed, 56 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 53eb005345791..f5eddacafd15f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -75,7 +75,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private int batchedReduceSize = 512; - private int maxConcurrentShardRequests = 256; + private int maxConcurrentShardRequests = 0; private String[] types = Strings.EMPTY_ARRAY; @@ -308,16 +308,16 @@ public int getBatchedReduceSize() { /** * Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default is 256 + * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most 256. */ public int getMaxConcurrentShardRequests() { - return maxConcurrentShardRequests; + return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests; } /** * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default is 256 + * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most 256. */ public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { if (maxConcurrentShardRequests < 1) { @@ -326,6 +326,13 @@ public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { this.maxConcurrentShardRequests = maxConcurrentShardRequests; } + /** + * Returns true iff the maxConcurrentShardRequest is set. + */ + boolean isMaxConcurrentShardRequestsSet() { + return maxConcurrentShardRequests != 0; + } + /** * @return true if the request only has suggest */ diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 6e7034f1813d7..49e25f67493cb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -528,8 +528,8 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) { /** * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default is 256 + * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most 256. */ public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 51681a62b3ad3..de2dfea6b8d1a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -184,7 +185,8 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(), - (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener); + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes() + .getDataNodes().size()); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { @@ -192,8 +194,10 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() + + clusterState.getNodes().getDataNodes().size(); executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, remoteShardIterators, - clusterNodeLookup, clusterState, remoteAliasFilters, listener); + clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled); }, listener::onFailure)); } } @@ -250,7 +254,7 @@ static BiFunction processRemoteShards(Map remoteClusterIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, - Map remoteAliasMap, ActionListener listener) { + Map remoteAliasMap, ActionListener listener, int nodeCount) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name @@ -303,7 +307,15 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea } return searchTransportService.getConnection(clusterName, discoveryNode); }; - + if (searchRequest.isMaxConcurrentShardRequestsSet() == false) { + // we try to set a default of max concurrent shard requests based on + // the node count but upper-bound it by 256 by default to keep it sane. A single + // search request that fans out lots of shards should hit a cluster too hard while 256 is already a lot + // we multiply is by the default number of shards such that a single request in a cluster of 1 would hit all shards of a + // default index. + searchRequest.setMaxConcurrentShardRequests(Math.min(256, nodeCount + * IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getDefault(Settings.EMPTY))); + } searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start(); } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index fb4a2303ba014..b871446ba8853 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -99,9 +99,13 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); searchRequest.setBatchedReduceSize(batchedReduceSize); - final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", - searchRequest.getMaxConcurrentShardRequests()); - searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + if (request.hasParam("max_concurrent_shard_requests")) { + // only set if we have the parameter since we auto adjust the max concurrency on the coordinator + // based on the number of nodes in the cluster + final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", + searchRequest.getMaxConcurrentShardRequests()); + searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + } // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types // from the REST layer. these modes are an internal optimization and should diff --git a/docs/reference/search/search.asciidoc b/docs/reference/search/search.asciidoc index 41ba6e5c87ab8..816cf54c5290c 100644 --- a/docs/reference/search/search.asciidoc +++ b/docs/reference/search/search.asciidoc @@ -67,3 +67,10 @@ CPU and memory wise. It is usually a better idea to organize data in such a way that there are fewer larger shards. In case you would like to configure a soft limit, you can update the `action.search.shard_count.limit` cluster setting in order to reject search requests that hit too many shards. + +The search's `max_concurrent_shard_requests` request parameter can be used to control +the maximum number of concurrent shard requests the search API will execute for this request. +This parameter should be used to protect a singe request from overloading a cluster ie. a default +request will hit all indices in a cluster which could cause shard request rejections if the +number of shards per node is high. This default is based on the number of data nodes in +the cluster but at most `256`. \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index b00bc821040a1..92086d26a2768 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -167,8 +167,7 @@ "max_concurrent_shard_requests" : { "type" : "number", "description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", - - "default" : 256 + "default" : "The default grows with the number of nodes in the cluster but is at most 256." } } }, diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java index b9f3ce3b68209..7076a80f29bbb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java @@ -56,14 +56,21 @@ public RandomizingClient(Client client, Random random) { defaultPreference = null; } this.batchedReduceSize = 2 + random.nextInt(10); - this.maxConcurrentShardRequests = 1 + random.nextInt(1 << random.nextInt(8)); - + if (random.nextBoolean()) { + this.maxConcurrentShardRequests = 1 + random.nextInt(1 << random.nextInt(8)); + } else { + this.maxConcurrentShardRequests = -1; // randomly use the default + } } @Override public SearchRequestBuilder prepareSearch(String... indices) { - return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference) - .setBatchedReduceSize(batchedReduceSize).setMaxConcurrentShardRequests(maxConcurrentShardRequests); + SearchRequestBuilder searchRequestBuilder = in.prepareSearch(indices).setSearchType(defaultSearchType) + .setPreference(defaultPreference).setBatchedReduceSize(batchedReduceSize); + if (maxConcurrentShardRequests != -1) { + searchRequestBuilder.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + } + return searchRequestBuilder; } @Override From 346c71f34bf77549a6c39728bc751e853cabc541 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 11 Jul 2017 11:23:59 +0200 Subject: [PATCH 6/7] rename variable --- .../elasticsearch/action/search/InitialSearchPhase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 5fcd8833df423..1636236525015 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -51,7 +51,7 @@ abstract class InitialSearchPhase extends private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); - private final int concurrentRunnables; + private final int maxConcurrentShardRequests; InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { super(name); @@ -63,7 +63,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - concurrentRunnables = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size()); + maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size()); } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, @@ -128,9 +128,9 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Override public final void run() throws IOException { - boolean success = shardExecutionIndex.compareAndSet(0, concurrentRunnables); + boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); assert success; - for (int i = 0; i < concurrentRunnables; i++) { + for (int i = 0; i < maxConcurrentShardRequests; i++) { SearchShardIterator shardRoutings = shardsIts.get(i); performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull()); } From a119abfb9bbb0f7afbc121fe402283ad7f046e0b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 11 Jul 2017 11:34:33 +0200 Subject: [PATCH 7/7] remove newlines --- .../java/org/elasticsearch/action/search/SearchRequest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index f5eddacafd15f..02cde220b3562 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -383,8 +383,6 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { maxConcurrentShardRequests = in.readVInt(); } - - } @Override