Skip to content

Commit d3d41fc

Browse files
author
Michael McCandless
committed
Merge pull request #15585 from mikemccand/max_indexing_thread_pool_size
Limit the max size of bulk and index thread pools to bounded number of processors
2 parents 0b1a7fc + 8e7719d commit d3d41fc

File tree

2 files changed

+88
-14
lines changed

2 files changed

+88
-14
lines changed

core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
458458
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
459459
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
460460
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
461-
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
461+
int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax()));
462462
if (previousInfo.getMax() != updatedSize) {
463463
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
464464
// if you think this code is crazy: that's because it is!
@@ -480,7 +480,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
480480
defaultQueueSize = previousInfo.getQueueSize();
481481
}
482482

483-
int size = settings.getAsInt("size", defaultSize);
483+
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
484484
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
485485
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
486486
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
@@ -533,6 +533,21 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
533533
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
534534
}
535535

536+
private int applyHardSizeLimit(String name, int size) {
537+
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
538+
if ((name.equals(Names.BULK) || name.equals(Names.INDEX)) && size > availableProcessors) {
539+
// We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means
540+
// too many segments written, too frequently, too much merging, etc:
541+
// TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous
542+
// because on upgrade this setting could be in cluster state and hard for the user to correct?
543+
logger.warn("requested thread pool size [{}] for [{}] is too large; setting to maximum [{}] instead",
544+
size, name, availableProcessors);
545+
size = availableProcessors;
546+
}
547+
548+
return size;
549+
}
550+
536551
private void updateSettings(Settings settings) {
537552
Map<String, Settings> groupSettings = settings.getAsGroups();
538553
if (groupSettings.isEmpty()) {

core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.settings.ClusterSettings;
2323
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2425
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2526
import org.elasticsearch.test.ESTestCase;
2627
import org.elasticsearch.threadpool.ThreadPool.Names;
@@ -89,6 +90,51 @@ public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedExcep
8990
}
9091
}
9192

93+
public void testIndexingThreadPoolsMaxSize() throws InterruptedException {
94+
String threadPoolName = randomThreadPoolName();
95+
for (String name : new String[] {ThreadPool.Names.BULK, ThreadPool.Names.INDEX}) {
96+
ThreadPool threadPool = null;
97+
try {
98+
99+
int maxSize = EsExecutors.boundedNumberOfProcessors(Settings.EMPTY);
100+
101+
// try to create a too-big (maxSize+1) thread pool
102+
threadPool = new ThreadPool(settingsBuilder()
103+
.put("name", "testIndexingThreadPoolsMaxSize")
104+
.put("threadpool." + name + ".size", maxSize+1)
105+
.build());
106+
107+
// confirm it clipped us at the maxSize:
108+
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
109+
110+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
111+
threadPool.setClusterSettings(clusterSettings);
112+
113+
// update it to a tiny size:
114+
clusterSettings.applySettings(
115+
settingsBuilder()
116+
.put("threadpool." + name + ".size", 1)
117+
.build()
118+
);
119+
120+
// confirm it worked:
121+
assertEquals(1, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
122+
123+
// try to update to too-big size:
124+
clusterSettings.applySettings(
125+
settingsBuilder()
126+
.put("threadpool." + name + ".size", maxSize+1)
127+
.build()
128+
);
129+
130+
// confirm it clipped us at the maxSize:
131+
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
132+
} finally {
133+
terminateThreadPoolIfNeeded(threadPool);
134+
}
135+
}
136+
}
137+
92138
public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException {
93139
String threadPoolName = randomThreadPoolName();
94140
ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
@@ -165,6 +211,14 @@ public void testCachedExecutorType() throws InterruptedException {
165211
}
166212
}
167213

214+
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
215+
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
216+
return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings));
217+
} else {
218+
return size;
219+
}
220+
}
221+
168222
public void testFixedExecutorType() throws InterruptedException {
169223
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
170224
ThreadPool threadPool = null;
@@ -179,12 +233,14 @@ public void testFixedExecutorType() throws InterruptedException {
179233
Settings settings = clusterSettings.applySettings(settingsBuilder()
180234
.put("threadpool." + threadPoolName + ".size", "15")
181235
.build());
236+
237+
int expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 15);
182238
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
183239
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
184-
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
185-
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
186-
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
187-
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
240+
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
241+
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
242+
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
243+
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
188244
// keep alive does not apply to fixed thread pools
189245
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));
190246

@@ -194,20 +250,23 @@ public void testFixedExecutorType() throws InterruptedException {
194250
// Make sure keep alive value is not used
195251
assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue());
196252
// Make sure keep pool size value were reused
197-
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
198-
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
253+
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
254+
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
199255
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
200-
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
201-
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
256+
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
257+
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
202258

203259
// Change size
204260
Executor oldExecutor = threadPool.executor(threadPoolName);
205261
settings = clusterSettings.applySettings(settingsBuilder().put(settings).put("threadpool." + threadPoolName + ".size", "10").build());
262+
263+
expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 10);
264+
206265
// Make sure size values changed
207-
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
208-
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10));
209-
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10));
210-
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10));
266+
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
267+
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
268+
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
269+
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
211270
// Make sure executor didn't change
212271
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
213272
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));

0 commit comments

Comments
 (0)