diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 039b46b5c7a52..5897351fa01fe 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -39,12 +39,7 @@ import org.elasticsearch.node.settings.NodeSettingsService; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -336,6 +331,9 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde } return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, type)); } else if ("cached".equals(type)) { + if (!Names.GENERIC.equals(name)) { + throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]"); + } TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); if (previousExecutorHolder != null) { if ("cached".equals(previousInfo.getType())) { diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 5e866125e96a9..adbb53741dd29 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -411,7 +411,7 @@ private Settings getRandomNodeSettings(long seed) { ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { if (random.nextBoolean()) { - final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); + final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "scaling")); builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type); } } diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index d52f67dc82ca7..61b5ec873eb12 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -67,7 +67,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build(); + return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "scaling").build(); } public void testThreadNames() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index cd252b60d715c..e7f0548f8f909 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -25,6 +25,10 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @@ -50,59 +54,23 @@ private ThreadPool.Info info(ThreadPool threadPool, String name) { return null; } - public void testCachedExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool( - Settings.settingsBuilder() - .put("threadpool.search.type", "cached") - .put("name","testCachedExecutorType").build()); - - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Replace with different type - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same")); - assertThat(threadPool.executor(Names.SEARCH), is(ThreadPool.DIRECT_EXECUTOR)); - - // Replace with different type again - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Put old type back - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - // Make sure keep alive value reused - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Change keep alive - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - // Set the same keep alive - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); - // Make sure keep alive value didn't change - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - terminate(threadPool); + public void testCachedExecutorType() throws InterruptedException, IllegalAccessException { + Set exceptions = new HashSet<>(Arrays.asList(Names.GENERIC, Names.SAME)); + Field[] fields = Names.class.getDeclaredFields(); + for (Field field : fields) { + String name = (String)field.get(null); + if (field.getType().equals(String.class) && Modifier.isStatic(field.getModifiers()) && !exceptions.contains(name)) { + try { + new ThreadPool( + Settings.settingsBuilder() + .put("threadpool." + name + ".type", "cached") + .put("name", "testCachedExecutorType").build()); + fail("thread pool type cached is reserved for the generic thread pool but was able to set for thread pool " + name); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), is("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]")); + } + } + } } public void testFixedExecutorType() throws InterruptedException { @@ -199,7 +167,7 @@ public void testScalingExecutorType() throws InterruptedException { public void testShutdownNowInterrupts() throws Exception { ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.search.type", "cached") + .put("threadpool.search.type", "scaling") .put("name","testCachedExecutorType").build()); final CountDownLatch latch = new CountDownLatch(1); @@ -227,7 +195,7 @@ public void run() { public void testCustomThreadPool() throws Exception { ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.my_pool1.type", "cached") + .put("threadpool.my_pool1.type", "scaling") .put("threadpool.my_pool2.type", "fixed") .put("threadpool.my_pool2.size", "1") .put("threadpool.my_pool2.queue_size", "1") @@ -239,7 +207,7 @@ public void testCustomThreadPool() throws Exception { outer: for (ThreadPool.Info info : groups) { if ("my_pool1".equals(info.getName())) { foundPool1 = true; - assertThat(info.getType(), equalTo("cached")); + assertThat(info.getType(), equalTo("scaling")); } else if ("my_pool2".equals(info.getName())) { foundPool2 = true; assertThat(info.getType(), equalTo("fixed")); @@ -271,7 +239,7 @@ public void testCustomThreadPool() throws Exception { outer: for (ThreadPool.Info info : groups) { if ("my_pool1".equals(info.getName())) { foundPool1 = true; - assertThat(info.getType(), equalTo("cached")); + assertThat(info.getType(), equalTo("scaling")); } else if ("my_pool2".equals(info.getName())) { foundPool2 = true; assertThat(info.getMax(), equalTo(10)); diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index 30c1c38aae835..c25ea5b0955dc 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -376,4 +376,13 @@ request cache and the field data cache. This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names, -hostnames, or addresses. +hostnames, or addresses. + +=== Forbid thread pool from being of type `cached` + +Previously, <> could be of type `cached`, `fixed`, or `scaling`. The `cached` type is +generally dangerous because it is an unbounded thread pool. The ability to set a thread pool to be of type `cached` has +been removed; the `cached` thread pool remains and is reserved only for the <> thread pool +because operations submitted to this thread pool must not block and can not be rejected. Elasticsearch will throw an +`IllegalArgumentException` if the `cached` thread pool type is applied to any thread pool other than the `generic` +thread pool. diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 591277889bc3d..2afe7bce1e5b6 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -9,6 +9,9 @@ of discarded. There are several thread pools, but the important ones include: +`generic`:: + For generic operations (e.g., background node discovery). + `index`:: For index/delete operations. Defaults to `fixed` with a size of `# of available processors`, @@ -67,7 +70,7 @@ threadpool: size: 30 -------------------------------------------------- -NOTE: you can update threadpool settings live using +NOTE: you can update thread pool settings live using <>. @@ -79,18 +82,16 @@ The following are the types of thread pools that can be used and their respective parameters: [float] -==== `cache` - -The `cache` thread pool is an unbounded thread pool that will spawn a -thread if there are pending requests. Here is an example of how to set -it: - -[source,js] --------------------------------------------------- -threadpool: - index: - type: cached --------------------------------------------------- +==== `cached` + +The `cached` thread pool is an unbounded thread pool that will spawn a +thread if there are pending requests. This thread pool is used to +prevent requests submitted to this pool from blocking or being +rejected. Unused threads in this thread pool will be terminated after +a keep alive expires (defaults to thirty seconds). The `cached` thread +pool is reserved for the <> thread pool +(which must be of type `cached`) and can not be applied to any other +thread pool. [float] ==== `fixed`