diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 5d0c814a285cf..9b8b9331b9ebb 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -458,7 +458,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) { SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) { - int updatedSize = settings.getAsInt("size", previousInfo.getMax()); + int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax())); if (previousInfo.getMax() != updatedSize) { logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize); // if you think this code is crazy: that's because it is! @@ -480,7 +480,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde defaultQueueSize = previousInfo.getQueueSize(); } - int size = settings.getAsInt("size", defaultSize); + int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize)); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); @@ -533,6 +533,21 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } + private int applyHardSizeLimit(String name, int size) { + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + if ((name.equals(Names.BULK) || name.equals(Names.INDEX)) && size > availableProcessors) { + // We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means + // too many segments written, too frequently, too much merging, etc: + // TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous + // because on upgrade this setting could be in cluster state and hard for the user to correct? + logger.warn("requested thread pool size [{}] for [{}] is too large; setting to maximum [{}] instead", + size, name, availableProcessors); + size = availableProcessors; + } + + return size; + } + private void updateSettings(Settings settings) { Map groupSettings = settings.getAsGroups(); if (groupSettings.isEmpty()) { diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index e1b1c4451c959..09653c12e0795 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -89,6 +90,51 @@ public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedExcep } } + public void testIndexingThreadPoolsMaxSize() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + for (String name : new String[] {ThreadPool.Names.BULK, ThreadPool.Names.INDEX}) { + ThreadPool threadPool = null; + try { + + int maxSize = EsExecutors.boundedNumberOfProcessors(Settings.EMPTY); + + // try to create a too-big (maxSize+1) thread pool + threadPool = new ThreadPool(settingsBuilder() + .put("name", "testIndexingThreadPoolsMaxSize") + .put("threadpool." + name + ".size", maxSize+1) + .build()); + + // confirm it clipped us at the maxSize: + assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize()); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + threadPool.setClusterSettings(clusterSettings); + + // update it to a tiny size: + clusterSettings.applySettings( + settingsBuilder() + .put("threadpool." + name + ".size", 1) + .build() + ); + + // confirm it worked: + assertEquals(1, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize()); + + // try to update to too-big size: + clusterSettings.applySettings( + settingsBuilder() + .put("threadpool." + name + ".size", maxSize+1) + .build() + ); + + // confirm it clipped us at the maxSize: + assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize()); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + } + public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException { String threadPoolName = randomThreadPoolName(); ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName); @@ -165,6 +211,14 @@ public void testCachedExecutorType() throws InterruptedException { } } + private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { + if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { + return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings)); + } else { + return size; + } + } + public void testFixedExecutorType() throws InterruptedException { String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); ThreadPool threadPool = null; @@ -179,12 +233,14 @@ public void testFixedExecutorType() throws InterruptedException { Settings settings = clusterSettings.applySettings(settingsBuilder() .put("threadpool." + threadPoolName + ".size", "15") .build()); + + int expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 15); assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); - assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); // keep alive does not apply to fixed thread pools assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L)); @@ -194,20 +250,23 @@ public void testFixedExecutorType() throws InterruptedException { // Make sure keep alive value is not used assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue()); // Make sure keep pool size value were reused - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); - assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize)); // Change size Executor oldExecutor = threadPool.executor(threadPoolName); settings = clusterSettings.applySettings(settingsBuilder().put(settings).put("threadpool." + threadPoolName + ".size", "10").build()); + + expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 10); + // Make sure size values changed - assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize)); // Make sure executor didn't change assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));