Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@
import org.elasticsearch.node.settings.NodeSettingsService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -336,6 +331,9 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
}
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, type));
} else if ("cached".equals(type)) {
if (!Names.GENERIC.equals(name)) {
throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]");
}
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
if (previousExecutorHolder != null) {
if ("cached".equals(previousInfo.getType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ private Settings getRandomNodeSettings(long seed) {
ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
if (random.nextBoolean()) {
final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling"));
final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "scaling"));
builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
public class SimpleThreadPoolIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build();
return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "scaling").build();
}

public void testThreadNames() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -50,59 +54,23 @@ private ThreadPool.Info info(ThreadPool threadPool, String name) {
return null;
}

public void testCachedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(
Settings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build());

assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));

// Replace with different type
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same"));
assertThat(threadPool.executor(Names.SEARCH), is(ThreadPool.DIRECT_EXECUTOR));

// Replace with different type again
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));

// Put old type back
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
// Make sure keep alive value reused
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));

// Change keep alive
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));

// Set the same keep alive
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
terminate(threadPool);
public void testCachedExecutorType() throws InterruptedException, IllegalAccessException {
Set<String> exceptions = new HashSet<>(Arrays.asList(Names.GENERIC, Names.SAME));
Field[] fields = Names.class.getDeclaredFields();
for (Field field : fields) {
String name = (String)field.get(null);
if (field.getType().equals(String.class) && Modifier.isStatic(field.getModifiers()) && !exceptions.contains(name)) {
try {
new ThreadPool(
Settings.settingsBuilder()
.put("threadpool." + name + ".type", "cached")
.put("name", "testCachedExecutorType").build());
fail("thread pool type cached is reserved for the generic thread pool but was able to set for thread pool " + name);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]"));
}
}
}
}

public void testFixedExecutorType() throws InterruptedException {
Expand Down Expand Up @@ -199,7 +167,7 @@ public void testScalingExecutorType() throws InterruptedException {

public void testShutdownNowInterrupts() throws Exception {
ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("threadpool.search.type", "scaling")
.put("name","testCachedExecutorType").build());

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -227,7 +195,7 @@ public void run() {

public void testCustomThreadPool() throws Exception {
ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.my_pool1.type", "cached")
.put("threadpool.my_pool1.type", "scaling")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
Expand All @@ -239,7 +207,7 @@ public void testCustomThreadPool() throws Exception {
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
assertThat(info.getType(), equalTo("scaling"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getType(), equalTo("fixed"));
Expand Down Expand Up @@ -271,7 +239,7 @@ public void testCustomThreadPool() throws Exception {
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
assertThat(info.getType(), equalTo("scaling"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getMax(), equalTo(10));
Expand Down
11 changes: 10 additions & 1 deletion docs/reference/migration/migrate_3_0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,13 @@ request cache and the field data cache.

This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address
scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names,
hostnames, or addresses.
hostnames, or addresses.

=== Forbid thread pool from being of type `cached`

Previously, <<modules-threadpool,thread pools>> could be of type `cached`, `fixed`, or `scaling`. The `cached` type is
generally dangerous because it is an unbounded thread pool. The ability to set a thread pool to be of type `cached` has
been removed; the `cached` thread pool remains and is reserved only for the <<modules-threadpool,`generic`>> thread pool
because operations submitted to this thread pool must not block and can not be rejected. Elasticsearch will throw an
`IllegalArgumentException` if the `cached` thread pool type is applied to any thread pool other than the `generic`
thread pool.
27 changes: 14 additions & 13 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ of discarded.

There are several thread pools, but the important ones include:

`generic`::
For generic operations (e.g., background node discovery).

`index`::
For index/delete operations. Defaults to `fixed`
with a size of `# of available processors`,
Expand Down Expand Up @@ -67,7 +70,7 @@ threadpool:
size: 30
--------------------------------------------------

NOTE: you can update threadpool settings live using
NOTE: you can update thread pool settings live using
<<cluster-update-settings>>.


Expand All @@ -79,18 +82,16 @@ The following are the types of thread pools that can be used and their
respective parameters:

[float]
==== `cache`

The `cache` thread pool is an unbounded thread pool that will spawn a
thread if there are pending requests. Here is an example of how to set
it:

[source,js]
--------------------------------------------------
threadpool:
index:
type: cached
--------------------------------------------------
==== `cached`

The `cached` thread pool is an unbounded thread pool that will spawn a
thread if there are pending requests. This thread pool is used to
prevent requests submitted to this pool from blocking or being
rejected. Unused threads in this thread pool will be terminated after
a keep alive expires (defaults to thirty seconds). The `cached` thread
pool is reserved for the <<modules-threadpool,`generic`>> thread pool
(which must be of type `cached`) and can not be applied to any other
thread pool.

[float]
==== `fixed`
Expand Down