-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Limit the number of concurrent shard requests per search request #25632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is a protection mechanism to prevent a single search request from hitting a large number of shards in the cluster concurrently. If a search is executed against all indices in the cluster this can easily overload the cluster causing rejections etc. which is not necessarily desirable. Instead this PR adds a per request limit of `max_concurrent_shard_requests` that throttles the number of concurrent initial phase requests to `256` by default. This limit can be increased per request and protects single search requests from overloading the cluster. Subsequent PRs can introduces addiontional improvemetns ie. limiting this on a `_msearch` level, making defaults a factor of the number of nodes or sort shards iters such that we gain the best concurrency across nodes.
jpountz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minor comments.
|
|
||
| private int batchedReduceSize = 512; | ||
|
|
||
| private int maxNunConcurrentShardRequests = 256; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nun -> num
| * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to | ||
| * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled | ||
| * with this number to reduce the cluster load. The default is <tt>256</tt> | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/nun/num/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe rather remove Num from the variable name since the same variable seems to be called maxConcurrentShardRequests elsewhere and in the dsl, so it would be more consistent?
| final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); | ||
| searchRequest.setBatchedReduceSize(batchedReduceSize); | ||
|
|
||
| final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", searchRequest.getBatchedReduceSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default value looks wrong
| }, | ||
| "max_concurrent_shard_requests" : { | ||
| "type" : "number", | ||
| "description" : "The number of concurrent shard requests this search executes concurrently. This values should be used to limitthe impact of the search on the cluster in order to limit the number of concurrent shard requests", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/values/value;s/limitsthe/limits the/
| private final SearchType defaultSearchType; | ||
| private final String defaultPreference; | ||
| private final int batchedReduceSize; | ||
| private final int maxConcurrentRequests; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add "shard" to the name of this variable for consistency?
| defaultPreference = null; | ||
| } | ||
| this.batchedReduceSize = 2 + random.nextInt(10); | ||
| this.maxConcurrentRequests = 1 + random.nextInt(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do something that is more likely to trigger interesting behaviours such as 1 + random.nextInt(1 << random.nextInt(8));
| boolean success = shardExecutionIndex.compareAndSet(0, concurrentRunnables); | ||
| assert success; | ||
| for (int i = 0; i < concurrentRunnables; i++) { | ||
| int index = i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's just use i directly? I presume a previous iteration of this work used a lambda that needed a final variable?
jimczi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it because it is simple but it seems difficult to pick a good a value for the limit. The main unknown for the user is the number of nodes that the search will hit so I thought that we would apply this limit per targeted node. It can be a follow up but I think it would make the usage of this much simpler since the goal is to no overload a single node. By doing that we could lower the default value (say 64 which is already high if it's per node) and still get fast search when you have a lot of nodes?
jimczi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Subsequent PRs can introduces
additional improvements ie. limiting this on a _msearch level, making defaults a factor of
the number of nodes or sort shards iters such that we gain the best concurrency across nodes.
The shard iters are sorted per shard number first then by index which is a good heuristic to diversify the nodes in the group iteration. We can try to do a better job but after checking the code I think this is much nicer and simpler as it is ;).
Also the setting is currently only applicable to query_then_fetch queries, we should also have a follow up to apply the setting in the dfs_query_then_fetch ? As is this PR throttles the dfs queries but not the query phase ?
hehe good catch I was curious if reviews would catch it :) I agree I will do that too. |
jimczi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new heuristic with the node count and the default number of shards.
+1
* master: Add an underscore to flood stage setting Avoid failing install if system-sysctl is masked Add another parent value option to join documentation (elastic#25609) Ensure we rewrite common queries to `match_none` if possible (elastic#25650) Remove reference to field-stats docs. Optimize the order of bytes in uuids for better compression. (elastic#24615) Fix BytesReferenceStreamInput#skip with offset (elastic#25634) Limit the number of concurrent shard requests per search request (elastic#25632)
) This is a protection mechanism to prevent a single search request from hitting a large number of shards in the cluster concurrently. If a search is executed against all indices in the cluster this can easily overload the cluster causing rejections etc. which is not necessarily desirable. Instead this PR adds a per request limit of `max_concurrent_shard_requests` that throttles the number of concurrent initial phase requests to `256` by default. This limit can be increased per request and protects single search requests from overloading the cluster. Subsequent PRs can introduces addiontional improvemetns ie. limiting this on a `_msearch` level, making defaults a factor of the number of nodes or sort shards iters such that we gain the best concurrency across nodes.
This is a protection mechanism to prevent a single search request from
hitting a large number of shards in the cluster concurrently. If a search is
executed against all indices in the cluster this can easily overload the cluster
causing rejections etc. which is not necessarily desirable. Instead this PR adds
a per request limit of
max_concurrent_shard_requeststhat throttles the number ofconcurrent initial phase requests to
256by default. This limit can be increased per requestand protects single search requests from overloading the cluster. Subsequent PRs can introduces
additional improvements ie. limiting this on a
_msearchlevel, making defaults a factor ofthe number of nodes or sort shards iters such that we gain the best concurrency across nodes.