Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax()));
if (previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
// if you think this code is crazy: that's because it is!
Expand All @@ -480,7 +480,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
defaultQueueSize = previousInfo.getQueueSize();
}

int size = settings.getAsInt("size", defaultSize);
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
Expand Down Expand Up @@ -533,6 +533,21 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}

private int applyHardSizeLimit(String name, int size) {
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
if ((name.equals(Names.BULK) || name.equals(Names.INDEX)) && size > availableProcessors) {
// We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means
// too many segments written, too frequently, too much merging, etc:
// TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous
// because on upgrade this setting could be in cluster state and hard for the user to correct?
logger.warn("requested thread pool size [{}] for [{}] is too large; setting to maximum [{}] instead",
size, name, availableProcessors);
size = availableProcessors;
}

return size;
}

private void updateSettings(Settings settings) {
Map<String, Settings> groupSettings = settings.getAsGroups();
if (groupSettings.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool.Names;
Expand Down Expand Up @@ -89,6 +90,51 @@ public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedExcep
}
}

public void testIndexingThreadPoolsMaxSize() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
for (String name : new String[] {ThreadPool.Names.BULK, ThreadPool.Names.INDEX}) {
ThreadPool threadPool = null;
try {

int maxSize = EsExecutors.boundedNumberOfProcessors(Settings.EMPTY);

// try to create a too-big (maxSize+1) thread pool
threadPool = new ThreadPool(settingsBuilder()
.put("name", "testIndexingThreadPoolsMaxSize")
.put("threadpool." + name + ".size", maxSize+1)
.build());

// confirm it clipped us at the maxSize:
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());

ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);

// update it to a tiny size:
clusterSettings.applySettings(
settingsBuilder()
.put("threadpool." + name + ".size", 1)
.build()
);

// confirm it worked:
assertEquals(1, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());

// try to update to too-big size:
clusterSettings.applySettings(
settingsBuilder()
.put("threadpool." + name + ".size", maxSize+1)
.build()
);

// confirm it clipped us at the maxSize:
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
}

public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
Expand Down Expand Up @@ -165,6 +211,14 @@ public void testCachedExecutorType() throws InterruptedException {
}
}

private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings));
} else {
return size;
}
}

public void testFixedExecutorType() throws InterruptedException {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
ThreadPool threadPool = null;
Expand All @@ -179,12 +233,14 @@ public void testFixedExecutorType() throws InterruptedException {
Settings settings = clusterSettings.applySettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".size", "15")
.build());

int expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 15);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
// keep alive does not apply to fixed thread pools
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));

Expand All @@ -194,20 +250,23 @@ public void testFixedExecutorType() throws InterruptedException {
// Make sure keep alive value is not used
assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));

// Change size
Executor oldExecutor = threadPool.executor(threadPoolName);
settings = clusterSettings.applySettings(settingsBuilder().put(settings).put("threadpool." + threadPoolName + ".size", "10").build());

expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 10);

// Make sure size values changed
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
Expand Down