From e3b8dc71211cf4a52754213faf2e1f4cdbe9939c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 28 Oct 2015 10:16:54 -0400 Subject: [PATCH] Forbid changing thread pool types This commit forbids the changing of thread pool types for any thread pool. The motivation here is that these are expert settings with little practical advantage. Closes #14294, relates #2509, relates #2858, relates #5152 --- .../elasticsearch/cluster/ClusterModule.java | 2 +- .../rest/action/cat/RestThreadPoolAction.java | 2 +- .../elasticsearch/threadpool/ThreadPool.java | 275 ++++++--- .../search/SearchWithRejectionsIT.java | 1 - .../threadpool/SimpleThreadPoolIT.java | 69 +-- .../ThreadPoolSerializationTests.java | 30 +- .../ThreadPoolTypeSettingsValidatorTests.java | 54 ++ .../UpdateThreadPoolSettingsTests.java | 547 ++++++++++-------- docs/reference/migration/migrate_3_0.asciidoc | 10 +- docs/reference/modules/threadpool.asciidoc | 63 +- .../test/InternalTestCluster.java | 42 +- 11 files changed, 676 insertions(+), 419 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 60a4913bb5e7a..31f5eb4a9213f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -177,7 +177,7 @@ private void registerBuiltinClusterSettings() { registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE); - registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", Validator.EMPTY); + registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR); registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER); registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER); registerClusterDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, Validator.EMPTY); diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index 311c4eca2cc58..2ad9defe2a73d 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -288,7 +288,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR } } - table.addCell(poolInfo == null ? null : poolInfo.getType()); + table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType()); table.addCell(poolStats == null ? null : poolStats.getActive()); table.addCell(poolStats == null ? null : poolStats.getThreads()); table.addCell(poolStats == null ? null : poolStats.getQueue()); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 039b46b5c7a52..b0d81279b033b 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -20,6 +20,8 @@ package org.elasticsearch.threadpool; import org.apache.lucene.util.Counter; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.settings.Validator; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,22 +41,11 @@ import org.elasticsearch.node.settings.NodeSettingsService; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -86,6 +77,101 @@ public static class Names { public static final String FETCH_SHARD_STORE = "fetch_shard_store"; } + public enum ThreadPoolType { + CACHED("cached"), + DIRECT("direct"), + FIXED("fixed"), + SCALING("scaling"); + + private final String type; + + public String getType() { + return type; + } + + ThreadPoolType(String type) { + this.type = type; + } + + private final static Map TYPE_MAP; + + static { + Map typeMap = new HashMap<>(); + for (ThreadPoolType threadPoolType : ThreadPoolType.values()) { + typeMap.put(threadPoolType.getType(), threadPoolType); + } + TYPE_MAP = Collections.unmodifiableMap(typeMap); + } + + public static ThreadPoolType fromType(String type) { + ThreadPoolType threadPoolType = TYPE_MAP.get(type); + if (threadPoolType == null) { + throw new IllegalArgumentException("no ThreadPoolType for " + type); + } + return threadPoolType; + } + } + + public static Map THREAD_POOL_TYPES; + + static { + HashMap map = new HashMap<>(); + map.put(Names.SAME, ThreadPoolType.DIRECT); + map.put(Names.GENERIC, ThreadPoolType.CACHED); + map.put(Names.LISTENER, ThreadPoolType.FIXED); + map.put(Names.GET, ThreadPoolType.FIXED); + map.put(Names.INDEX, ThreadPoolType.FIXED); + map.put(Names.BULK, ThreadPoolType.FIXED); + map.put(Names.SEARCH, ThreadPoolType.FIXED); + map.put(Names.SUGGEST, ThreadPoolType.FIXED); + map.put(Names.PERCOLATE, ThreadPoolType.FIXED); + map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); + map.put(Names.FLUSH, ThreadPoolType.SCALING); + map.put(Names.REFRESH, ThreadPoolType.SCALING); + map.put(Names.WARMER, ThreadPoolType.SCALING); + map.put(Names.SNAPSHOT, ThreadPoolType.SCALING); + map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); + map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); + map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); + THREAD_POOL_TYPES = Collections.unmodifiableMap(map); + } + + private static void add(Map executorSettings, ExecutorSettingsBuilder builder) { + Settings settings = builder.build(); + String name = settings.get("name"); + executorSettings.put(name, settings); + } + + private static class ExecutorSettingsBuilder { + Map settings = new HashMap<>(); + + public ExecutorSettingsBuilder(String name) { + settings.put("name", name); + settings.put("type", THREAD_POOL_TYPES.get(name).getType()); + } + + public ExecutorSettingsBuilder size(int availableProcessors) { + return add("size", Integer.toString(availableProcessors)); + } + + public ExecutorSettingsBuilder queueSize(int queueSize) { + return add("queue_size", Integer.toString(queueSize)); + } + + public ExecutorSettingsBuilder keepAlive(String keepAlive) { + return add("keep_alive", keepAlive); + } + + private ExecutorSettingsBuilder add(String key, String value) { + settings.put(key, value); + return this; + } + + public Settings build() { + return settingsBuilder().put(settings).build(); + } + } + public static final String THREADPOOL_GROUP = "threadpool."; private volatile Map executors; @@ -102,7 +188,6 @@ public static class Names { static final Executor DIRECT_EXECUTOR = command -> command.run(); - public ThreadPool(String name) { this(Settings.builder().put("name", name).build()); } @@ -112,42 +197,31 @@ public ThreadPool(Settings settings) { assert settings.get("name") != null : "ThreadPool's settings should contain a name"; - Map groupSettings = settings.getGroups(THREADPOOL_GROUP); + Map groupSettings = getThreadPoolSettingsGroup(settings); int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); Map defaultExecutorTypeSettings = new HashMap<>(); - defaultExecutorTypeSettings.put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()); - defaultExecutorTypeSettings.put(Names.INDEX, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()); - defaultExecutorTypeSettings.put(Names.BULK, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()); - defaultExecutorTypeSettings.put(Names.GET, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); - defaultExecutorTypeSettings.put(Names.SEARCH, - settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()); - defaultExecutorTypeSettings.put(Names.SUGGEST, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); - defaultExecutorTypeSettings.put(Names.PERCOLATE, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); - defaultExecutorTypeSettings .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).keepAlive("30s")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SUGGEST).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.PERCOLATE).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m")); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side - defaultExecutorTypeSettings.put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()); - defaultExecutorTypeSettings.put(Names.FLUSH, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); - defaultExecutorTypeSettings.put(Names.REFRESH, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()); - defaultExecutorTypeSettings.put(Names.WARMER, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); - defaultExecutorTypeSettings.put(Names.SNAPSHOT, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); - defaultExecutorTypeSettings.put(Names.FORCE_MERGE, settingsBuilder().put("type", "fixed").put("size", 1).build()); - defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STARTED, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()); - defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STORE, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m")); + this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings); Map executors = new HashMap<>(); @@ -163,8 +237,8 @@ public ThreadPool(Settings settings) { executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY)); } - executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, "same"))); - if (!executors.get(Names.GENERIC).info.getType().equals("cached")) { + executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); + if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) { throw new IllegalArgumentException("generic thread pool must be of type cached"); } this.executors = unmodifiableMap(executors); @@ -178,6 +252,12 @@ public ThreadPool(Settings settings) { this.estimatedTimeThread.start(); } + private Map getThreadPoolSettingsGroup(Settings settings) { + Map groupSettings = settings.getGroups(THREADPOOL_GROUP); + validate(groupSettings); + return groupSettings; + } + public void setNodeSettingsService(NodeSettingsService nodeSettingsService) { if(settingsListenerIsSet) { throw new IllegalStateException("the node settings listener was set more then once"); @@ -326,24 +406,28 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde settings = Settings.Builder.EMPTY_SETTINGS; } Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null; - String type = settings.get("type", previousInfo != null ? previousInfo.getType() : defaultSettings.get("type")); + String type = settings.get("type", previousInfo != null ? previousInfo.getThreadPoolType().getType() : defaultSettings.get("type")); + ThreadPoolType threadPoolType = ThreadPoolType.fromType(type); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); - if ("same".equals(type)) { + if (ThreadPoolType.DIRECT == threadPoolType) { if (previousExecutorHolder != null) { logger.debug("updating thread_pool [{}], type [{}]", name, type); } else { logger.debug("creating thread_pool [{}], type [{}]", name, type); } - return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, type)); - } else if ("cached".equals(type)) { + return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType)); + } else if (ThreadPoolType.CACHED == threadPoolType) { + if (!Names.GENERIC.equals(name)) { + throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]"); + } TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); if (previousExecutorHolder != null) { - if ("cached".equals(previousInfo.getType())) { + if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, -1, -1, updatedKeepAlive, null)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null)); } return previousExecutorHolder; } @@ -358,13 +442,13 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); } Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); - return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); - } else if ("fixed".equals(type)) { + return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null)); + } else if (ThreadPoolType.FIXED == threadPoolType) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null)); if (previousExecutorHolder != null) { - if ("fixed".equals(previousInfo.getType())) { + if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) { SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) { int updatedSize = settings.getAsInt("size", previousInfo.getMax()); @@ -378,7 +462,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); } - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedSize, updatedSize, null, updatedQueueSize)); } return previousExecutorHolder; } @@ -393,13 +477,13 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); - return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); - } else if ("scaling".equals(type)) { + return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize)); + } else if (ThreadPoolType.SCALING == threadPoolType) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); if (previousExecutorHolder != null) { - if ("scaling".equals(previousInfo.getType())) { + if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); @@ -414,7 +498,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde if (previousInfo.getMax() != updatedSize) { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); } - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedMin, updatedSize, updatedKeepAlive, null)); } return previousExecutorHolder; } @@ -437,13 +521,13 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); - return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); + return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null)); } throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } public void updateSettings(Settings settings) { - Map groupSettings = settings.getGroups("threadpool"); + Map groupSettings = getThreadPoolSettingsGroup(settings); if (groupSettings.isEmpty()) { return; } @@ -490,6 +574,20 @@ public void updateSettings(Settings settings) { } } + private void validate(Map groupSettings) { + for (String key : groupSettings.keySet()) { + if (!THREAD_POOL_TYPES.containsKey(key)) { + continue; + } + String type = groupSettings.get(key).get("type"); + ThreadPoolType correctThreadPoolType = THREAD_POOL_TYPES.get(key); + // TODO: the type equality check can be removed after #3760/#6732 are addressed + if (type != null && !correctThreadPoolType.getType().equals(type)) { + throw new IllegalArgumentException("setting " + THREADPOOL_GROUP + key + ".type to " + type + " is not permitted; must be " + correctThreadPoolType.getType()); + } + } + } + /** * A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers) */ @@ -643,7 +741,7 @@ Executor executor() { public static class Info implements Streamable, ToXContent { private String name; - private String type; + private ThreadPoolType type; private int min; private int max; private TimeValue keepAlive; @@ -653,15 +751,15 @@ public static class Info implements Streamable, ToXContent { } - public Info(String name, String type) { + public Info(String name, ThreadPoolType type) { this(name, type, -1); } - public Info(String name, String type, int size) { + public Info(String name, ThreadPoolType type, int size) { this(name, type, size, size, null, null); } - public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) { + public Info(String name, ThreadPoolType type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) { this.name = name; this.type = type; this.min = min; @@ -674,7 +772,7 @@ public String getName() { return this.name; } - public String getType() { + public ThreadPoolType getThreadPoolType() { return this.type; } @@ -699,7 +797,7 @@ public SizeValue getQueueSize() { @Override public void readFrom(StreamInput in) throws IOException { name = in.readString(); - type = in.readString(); + type = ThreadPoolType.fromType(in.readString()); min = in.readInt(); max = in.readInt(); if (in.readBoolean()) { @@ -716,7 +814,7 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); - out.writeString(type); + out.writeString(type.getType()); out.writeInt(min); out.writeInt(max); if (keepAlive == null) { @@ -739,7 +837,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE); - builder.field(Fields.TYPE, type); + builder.field(Fields.TYPE, type.getType()); if (min != -1) { builder.field(Fields.MIN, min); } @@ -814,4 +912,37 @@ public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit return false; } + public static ThreadPoolTypeSettingsValidator THREAD_POOL_TYPE_SETTINGS_VALIDATOR = new ThreadPoolTypeSettingsValidator(); + private static class ThreadPoolTypeSettingsValidator implements Validator { + @Override + public String validate(String setting, String value, ClusterState clusterState) { + // TODO: the type equality validation can be removed after #3760/#6732 are addressed + Matcher matcher = Pattern.compile("threadpool\\.(.*)\\.type").matcher(setting); + if (!matcher.matches()) { + return null; + } else { + String threadPool = matcher.group(1); + ThreadPool.ThreadPoolType defaultThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPool); + ThreadPool.ThreadPoolType threadPoolType; + try { + threadPoolType = ThreadPool.ThreadPoolType.fromType(value); + } catch (IllegalArgumentException e) { + return e.getMessage(); + } + if (defaultThreadPoolType.equals(threadPoolType)) { + return null; + } else { + return String.format( + Locale.ROOT, + "thread pool type for [%s] can only be updated to [%s] but was [%s]", + threadPool, + defaultThreadPoolType.getType(), + threadPoolType.getType() + ); + } + } + + } + } + } diff --git a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java index a8c6a194c56fd..d3e3de5fda14c 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java +++ b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java @@ -37,7 +37,6 @@ public class SearchWithRejectionsIT extends ESIntegTestCase { @Override public Settings nodeSettings(int nodeOrdinal) { return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.search.type", "fixed") .put("threadpool.search.size", 1) .put("threadpool.search.queue_size", 1) .build(); diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index d52f67dc82ca7..838c2a6d401ce 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -46,20 +46,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.regex.Pattern; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.*; /** */ @@ -67,7 +60,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build(); + return Settings.settingsBuilder().build(); } public void testThreadNames() throws Exception { @@ -130,26 +123,23 @@ public void testUpdatingThreadPoolSettings() throws Exception { internalCluster().startNodesAsync(2).get(); ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); // Check that settings are changed - assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet(); - assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(1000)); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 2000).build()).execute().actionGet(); + assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(2000)); // Make sure that threads continue executing when executor is replaced final CyclicBarrier barrier = new CyclicBarrier(2); Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.executor(Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (BrokenBarrierException ex) { - // - } - } - }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + threadPool.executor(Names.SEARCH).execute(() -> { + try { + barrier.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException ex) { + // + } + }); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 1000).build()).execute().actionGet(); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); @@ -157,24 +147,19 @@ public void run() { barrier.await(10, TimeUnit.SECONDS); // Make sure that new thread executor is functional - threadPool.executor(Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (BrokenBarrierException ex) { - // + threadPool.executor(Names.SEARCH).execute(() -> { + try { + barrier.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException ex) { + // + } } - } - }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + ); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 500)).execute().actionGet(); barrier.await(10, TimeUnit.SECONDS); - // This was here: Thread.sleep(200); - // Why? What was it for? - // Check that node info is correct NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet(); for (int i = 0; i < 2; i++) { @@ -182,7 +167,7 @@ public void run() { boolean found = false; for (ThreadPool.Info info : nodeInfo.getThreadPool()) { if (info.getName().equals(Names.SEARCH)) { - assertThat(info.getType(), equalTo("fixed")); + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); found = true; break; } diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index cb27fd71f9df9..3d57c1d520669 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.threadpool; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -30,7 +31,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; +import java.io.IOException; import java.util.Map; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -44,9 +47,16 @@ */ public class ThreadPoolSerializationTests extends ESTestCase { BytesStreamOutput output = new BytesStreamOutput(); + private ThreadPool.ThreadPoolType threadPoolType; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPoolType = randomFrom(ThreadPool.ThreadPoolType.values()); + } public void testThatQueueSizeSerializationWorks() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k")); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k")); output.setVersion(Version.CURRENT); info.writeTo(output); @@ -58,7 +68,7 @@ public void testThatQueueSizeSerializationWorks() throws Exception { } public void testThatNegativeQueueSizesCanBeSerialized() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null); output.setVersion(Version.CURRENT); info.writeTo(output); @@ -70,7 +80,7 @@ public void testThatNegativeQueueSizesCanBeSerialized() throws Exception { } public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null); XContentBuilder builder = jsonBuilder(); builder.startObject(); info.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -95,7 +105,7 @@ public void testThatNegativeSettingAllowsToStart() throws InterruptedException { } public void testThatToXContentWritesInteger() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k")); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k")); XContentBuilder builder = jsonBuilder(); builder.startObject(); info.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -111,4 +121,16 @@ public void testThatToXContentWritesInteger() throws Exception { assertThat(map, hasKey("queue_size")); assertThat(map.get("queue_size").toString(), is("1000")); } + + public void testThatThreadPoolTypeIsSerializedCorrectly() throws IOException { + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType); + output.setVersion(Version.CURRENT); + info.writeTo(output); + + StreamInput input = StreamInput.wrap(output.bytes()); + ThreadPool.Info newInfo = new ThreadPool.Info(); + newInfo.readFrom(input); + + assertThat(newInfo.getThreadPoolType(), is(threadPoolType)); + } } diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java new file mode 100644 index 0000000000000..aa3b2a8dec2a2 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java @@ -0,0 +1,54 @@ +package org.elasticsearch.threadpool; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.settings.Validator; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.*; + +import static org.junit.Assert.*; + +public class ThreadPoolTypeSettingsValidatorTests extends ESTestCase { + private Validator validator; + + @Before + public void setUp() throws Exception { + super.setUp(); + validator = ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR; + } + + public void testValidThreadPoolTypeSettings() { + for (Map.Entry entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) { + assertNull(validateSetting(validator, entry.getKey(), entry.getValue().getType())); + } + } + + public void testInvalidThreadPoolTypeSettings() { + for (Map.Entry entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) { + Set set = new HashSet<>(); + set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values())); + set.remove(entry.getValue()); + ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()])); + String expectedMessage = String.format( + Locale.ROOT, + "thread pool type for [%s] can only be updated to [%s] but was [%s]", + entry.getKey(), + entry.getValue().getType(), + invalidThreadPoolType.getType()); + String message = validateSetting(validator, entry.getKey(), invalidThreadPoolType.getType()); + assertNotNull(message); + assertEquals(expectedMessage, message); + } + } + + public void testNonThreadPoolTypeSetting() { + String setting = ThreadPool.THREADPOOL_GROUP + randomAsciiOfLength(10) + "foo"; + String value = randomAsciiOfLength(10); + assertNull(validator.validate(setting, value, ClusterState.PROTO)); + } + + private String validateSetting(Validator validator, String threadPoolName, String value) { + return validator.validate(ThreadPool.THREADPOOL_GROUP + threadPoolName + ".type", value, ClusterState.PROTO); + } +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index cd252b60d715c..faa08243af05a 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -25,272 +25,353 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.*; /** */ public class UpdateThreadPoolSettingsTests extends ESTestCase { - private ThreadPool.Info info(ThreadPool threadPool, String name) { - for (ThreadPool.Info info : threadPool.info()) { - if (info.getName().equals(name)) { - return info; - } + public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(settingsBuilder() + .put("name", "testCorrectThreadPoolTypePermittedInSettings") + .put("threadpool." + threadPoolName + ".type", correctThreadPoolType.getType()) + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), correctThreadPoolType); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + ThreadPool.ThreadPoolType incorrectThreadPoolType = randomIncorrectThreadPoolType(threadPoolName); + ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool( + settingsBuilder() + .put("name", "testThreadPoolCanNotOverrideThreadPoolType") + .put("threadpool." + threadPoolName + ".type", incorrectThreadPoolType.getType()) + .build()); + terminate(threadPool); + fail("expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat( + e.getMessage(), + is("setting threadpool." + threadPoolName + ".type to " + incorrectThreadPoolType.getType() + " is not permitted; must be " + correctThreadPoolType.getType())); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName); + ThreadPool.ThreadPoolType validThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType").build()); + + + threadPool.updateSettings( + settingsBuilder() + .put("threadpool." + threadPoolName + ".type", invalidThreadPoolType.getType()) + .build() + ); + fail("expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat( + e.getMessage(), + is("setting threadpool." + threadPoolName + ".type to " + invalidThreadPoolType.getType() + " is not permitted; must be " + validThreadPoolType.getType())); + } finally { + terminateThreadPoolIfNeeded(threadPool); } - return null; } public void testCachedExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool( - Settings.settingsBuilder() - .put("threadpool.search.type", "cached") - .put("name","testCachedExecutorType").build()); - - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Replace with different type - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same")); - assertThat(threadPool.executor(Names.SEARCH), is(ThreadPool.DIRECT_EXECUTOR)); - - // Replace with different type again - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Put old type back - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - // Make sure keep alive value reused - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Change keep alive - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - // Set the same keep alive - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); - // Make sure keep alive value didn't change - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - terminate(threadPool); + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool( + Settings.settingsBuilder() + .put("name", "testCachedExecutorType").build()); + + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".keep_alive", "10m") + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0)); + // Make sure keep alive value changed + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + + // Make sure keep alive value reused + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + // Change keep alive + Executor oldExecutor = threadPool.executor(threadPoolName); + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); + // Make sure keep alive value changed + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); + // Make sure executor didn't change + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + + // Set the same keep alive + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); + // Make sure keep alive value didn't change + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); + // Make sure executor didn't change + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } } public void testFixedExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "fixed") - .put("name","testCachedExecutorType").build()); - - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Replace with different type - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Put old type back - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "fixed") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); - // Make sure keep alive value is not used - assertThat(info(threadPool, Names.SEARCH).getKeepAlive(), nullValue()); - // Make sure keep pool size value were reused - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - - // Change size - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build()); - // Make sure size values changed - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - // Change queue capacity - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.queue", "500") - .build()); - - terminate(threadPool); + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); + ThreadPool threadPool = null; + + try { + threadPool = new ThreadPool(settingsBuilder() + .put("name", "testCachedExecutorType").build()); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".size", "15") + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + // keep alive does not apply to fixed thread pools + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L)); + + // Put old type back + threadPool.updateSettings(Settings.EMPTY); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + // Make sure keep alive value is not used + assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue()); + // Make sure keep pool size value were reused + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + + // Change size + Executor oldExecutor = threadPool.executor(threadPoolName); + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".size", "10").build()); + // Make sure size values changed + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10)); + // Make sure executor didn't change + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + + // Change queue capacity + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".queue", "500") + .build()); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } } public void testScalingExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.size", 10) - .put("name","testCachedExecutorType").build()); - - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Change settings that doesn't require pool replacement - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - terminate(threadPool); + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(settingsBuilder() + .put("threadpool." + threadPoolName + ".size", 10) + .put("name", "testCachedExecutorType").build()); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L)); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + // Change settings that doesn't require pool replacement + Executor oldExecutor = threadPool.executor(threadPoolName); + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".keep_alive", "10m") + .put("threadpool." + threadPoolName + ".min", "2") + .put("threadpool." + threadPoolName + ".size", "15") + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(2)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(2)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + // Make sure keep alive value changed + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } } public void testShutdownNowInterrupts() throws Exception { - ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.search.type", "cached") - .put("name","testCachedExecutorType").build()); - - final CountDownLatch latch = new CountDownLatch(1); - ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(Names.SEARCH); - threadPool.executor(Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - new CountDownLatch(1).await(); - } catch (InterruptedException ex) { - latch.countDown(); - Thread.currentThread().interrupt(); - } - } - }); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()); - assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); - assertThat(oldExecutor.isShutdown(), equalTo(true)); - assertThat(oldExecutor.isTerminating(), equalTo(true)); - assertThat(oldExecutor.isTerminated(), equalTo(false)); - threadPool.shutdownNow(); // should interrupt the thread - latch.await(3, TimeUnit.SECONDS); // If this throws then shotdownNow didn't interrupt - terminate(threadPool); + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(Settings.settingsBuilder() + .put("threadpool." + threadPoolName + ".queue_size", 1000) + .put("name", "testCachedExecutorType").build()); + assertEquals(info(threadPool, threadPoolName).getQueueSize().getSingles(), 1000L); + + final CountDownLatch latch = new CountDownLatch(1); + ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(threadPoolName); + threadPool.executor(threadPoolName).execute(() -> { + try { + new CountDownLatch(1).await(); + } catch (InterruptedException ex) { + latch.countDown(); + Thread.currentThread().interrupt(); + } + } + ); + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".queue_size", 2000).build()); + assertThat(threadPool.executor(threadPoolName), not(sameInstance(oldExecutor))); + assertThat(oldExecutor.isShutdown(), equalTo(true)); + assertThat(oldExecutor.isTerminating(), equalTo(true)); + assertThat(oldExecutor.isTerminated(), equalTo(false)); + threadPool.shutdownNow(); // should interrupt the thread + latch.await(3, TimeUnit.SECONDS); // If this throws then ThreadPool#shutdownNow didn't interrupt + } finally { + terminateThreadPoolIfNeeded(threadPool); + } } public void testCustomThreadPool() throws Exception { - ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.my_pool1.type", "cached") - .put("threadpool.my_pool2.type", "fixed") - .put("threadpool.my_pool2.size", "1") - .put("threadpool.my_pool2.queue_size", "1") - .put("name", "testCustomThreadPool").build()); - - ThreadPoolInfo groups = threadPool.info(); - boolean foundPool1 = false; - boolean foundPool2 = false; - outer: for (ThreadPool.Info info : groups) { - if ("my_pool1".equals(info.getName())) { - foundPool1 = true; - assertThat(info.getType(), equalTo("cached")); - } else if ("my_pool2".equals(info.getName())) { - foundPool2 = true; - assertThat(info.getType(), equalTo("fixed")); - assertThat(info.getMin(), equalTo(1)); - assertThat(info.getMax(), equalTo(1)); - assertThat(info.getQueueSize().singles(), equalTo(1l)); - } else { - for (Field field : Names.class.getFields()) { - if (info.getName().equalsIgnoreCase(field.getName())) { - // This is ok it is a default thread pool - continue outer; + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(Settings.settingsBuilder() + .put("threadpool.my_pool1.type", "scaling") + .put("threadpool.my_pool2.type", "fixed") + .put("threadpool.my_pool2.size", "1") + .put("threadpool.my_pool2.queue_size", "1") + .put("name", "testCustomThreadPool").build()); + ThreadPoolInfo groups = threadPool.info(); + boolean foundPool1 = false; + boolean foundPool2 = false; + outer: + for (ThreadPool.Info info : groups) { + if ("my_pool1".equals(info.getName())) { + foundPool1 = true; + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + } else if ("my_pool2".equals(info.getName())) { + foundPool2 = true; + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + assertThat(info.getMin(), equalTo(1)); + assertThat(info.getMax(), equalTo(1)); + assertThat(info.getQueueSize().singles(), equalTo(1l)); + } else { + for (Field field : Names.class.getFields()) { + if (info.getName().equalsIgnoreCase(field.getName())) { + // This is ok it is a default thread pool + continue outer; + } } + fail("Unexpected pool name: " + info.getName()); } - fail("Unexpected pool name: " + info.getName()); } - } - assertThat(foundPool1, is(true)); - assertThat(foundPool2, is(true)); - - // Updating my_pool2 - Settings settings = Settings.builder() - .put("threadpool.my_pool2.size", "10") - .build(); - threadPool.updateSettings(settings); - - groups = threadPool.info(); - foundPool1 = false; - foundPool2 = false; - outer: for (ThreadPool.Info info : groups) { - if ("my_pool1".equals(info.getName())) { - foundPool1 = true; - assertThat(info.getType(), equalTo("cached")); - } else if ("my_pool2".equals(info.getName())) { - foundPool2 = true; - assertThat(info.getMax(), equalTo(10)); - assertThat(info.getMin(), equalTo(10)); - assertThat(info.getQueueSize().singles(), equalTo(1l)); - assertThat(info.getType(), equalTo("fixed")); - } else { - for (Field field : Names.class.getFields()) { - if (info.getName().equalsIgnoreCase(field.getName())) { - // This is ok it is a default thread pool - continue outer; + assertThat(foundPool1, is(true)); + assertThat(foundPool2, is(true)); + + // Updating my_pool2 + Settings settings = Settings.builder() + .put("threadpool.my_pool2.size", "10") + .build(); + threadPool.updateSettings(settings); + + groups = threadPool.info(); + foundPool1 = false; + foundPool2 = false; + outer: + for (ThreadPool.Info info : groups) { + if ("my_pool1".equals(info.getName())) { + foundPool1 = true; + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + } else if ("my_pool2".equals(info.getName())) { + foundPool2 = true; + assertThat(info.getMax(), equalTo(10)); + assertThat(info.getMin(), equalTo(10)); + assertThat(info.getQueueSize().singles(), equalTo(1l)); + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + } else { + for (Field field : Names.class.getFields()) { + if (info.getName().equalsIgnoreCase(field.getName())) { + // This is ok it is a default thread pool + continue outer; + } } + fail("Unexpected pool name: " + info.getName()); } - fail("Unexpected pool name: " + info.getName()); + } + assertThat(foundPool1, is(true)); + assertThat(foundPool2, is(true)); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException { + if (threadPool != null) { + terminate(threadPool); + } + } + + private ThreadPool.Info info(ThreadPool threadPool, String name) { + for (ThreadPool.Info info : threadPool.info()) { + if (info.getName().equals(name)) { + return info; } } - assertThat(foundPool1, is(true)); - assertThat(foundPool2, is(true)); - terminate(threadPool); + return null; + } + + private String randomThreadPoolName() { + Set threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet(); + return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()])); } + private ThreadPool.ThreadPoolType randomIncorrectThreadPoolType(String threadPoolName) { + Set set = new HashSet<>(); + set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values())); + set.remove(ThreadPool.THREAD_POOL_TYPES.get(threadPoolName)); + ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()])); + return invalidThreadPoolType; + } + + private String randomThreadPool(ThreadPool.ThreadPoolType type) { + return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(t -> t.getKey()).collect(Collectors.toList())); + } } diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index e0bdff8ddeeb2..aa602f9d64d9b 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -389,4 +389,12 @@ request cache and the field data cache. This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names, -hostnames, or addresses. +hostnames, or addresses. + +=== Forbid changing of thread pool types + +Previously, <> could be dynamically adjusted. The thread pool type effectively +controls the backing queue for the thread pool and modifying this is an expert setting with minimal practical benefits +and high risk of being misused. The ability to change the thread pool type for any thread pool has been removed; do note +that it is still possible to adjust relevant thread pool parameters for each of the thread pools (e.g., depending on +the thread pool type, `keep_alive`, `queue_size`, etc.). diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 591277889bc3d..bfd5474183cc5 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -9,87 +9,92 @@ of discarded. There are several thread pools, but the important ones include: +`generic`:: + For generic operations (e.g., background node discovery). + Thread pool type is `cached`. + `index`:: - For index/delete operations. Defaults to `fixed` + For index/delete operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `200`. `search`:: - For count/search operations. Defaults to `fixed` + For count/search operations. Thread pool type is `fixed` with a size of `int((# of available_processors * 3) / 2) + 1`, queue_size of `1000`. `suggest`:: - For suggest operations. Defaults to `fixed` + For suggest operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `1000`. `get`:: - For get operations. Defaults to `fixed` + For get operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `1000`. `bulk`:: - For bulk operations. Defaults to `fixed` + For bulk operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `50`. `percolate`:: - For percolate operations. Defaults to `fixed` + For percolate operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `1000`. `snapshot`:: - For snapshot/restore operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5. + For snapshot/restore operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. `warmer`:: - For segment warm-up operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5. + For segment warm-up operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. `refresh`:: - For refresh operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`, max at 10. + For refresh operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`. `listener`:: Mainly for java client executing of action when listener threaded is set to true. - Default size of `(# of available processors)/2`, max at 10. + Thread pool type is `scaling` with a default size of `min(10, (# of available processors)/2)`. -Changing a specific thread pool can be done by setting its type and -specific type parameters, for example, changing the `index` thread pool -to have more threads: +Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index` +thread pool to have more threads: [source,js] -------------------------------------------------- threadpool: index: - type: fixed size: 30 -------------------------------------------------- -NOTE: you can update threadpool settings live using - <>. - +NOTE: you can update thread pool settings dynamically using <>. [float] [[types]] === Thread pool types -The following are the types of thread pools that can be used and their -respective parameters: +The following are the types of thread pools and their respective parameters: [float] -==== `cache` +==== `cached` + +The `cached` thread pool is an unbounded thread pool that will spawn a +thread if there are pending requests. This thread pool is used to +prevent requests submitted to this pool from blocking or being +rejected. Unused threads in this thread pool will be terminated after +a keep alive expires (defaults to five minutes). The `cached` thread +pool is reserved for the <> thread pool. -The `cache` thread pool is an unbounded thread pool that will spawn a -thread if there are pending requests. Here is an example of how to set -it: +The `keep_alive` parameter determines how long a thread should be kept +around in the thread pool without doing any work. [source,js] -------------------------------------------------- threadpool: - index: - type: cached + generic: + keep_alive: 2m -------------------------------------------------- [float] @@ -111,7 +116,6 @@ full, it will abort the request. -------------------------------------------------- threadpool: index: - type: fixed size: 30 queue_size: 1000 -------------------------------------------------- @@ -130,7 +134,6 @@ around in the thread pool without it doing any work. -------------------------------------------------- threadpool: warmer: - type: scaling size: 8 keep_alive: 2m -------------------------------------------------- diff --git a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 6cafe4c3b7900..4f8e64cc1a0ea 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -64,10 +64,10 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.IndicesService; @@ -88,7 +88,6 @@ import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; @@ -98,20 +97,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.*; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -119,15 +109,11 @@ import java.util.stream.Stream; import static junit.framework.Assert.fail; -import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; -import static org.apache.lucene.util.LuceneTestCase.rarely; -import static org.apache.lucene.util.LuceneTestCase.usually; +import static org.apache.lucene.util.LuceneTestCase.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.*; import static org.junit.Assert.assertThat; /** @@ -404,18 +390,6 @@ private Settings getRandomNodeSettings(long seed) { if (random.nextBoolean()) { // sometimes set a builder.put(SearchService.DEFAULT_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60))); } - if (random.nextBoolean()) { - // change threadpool types to make sure we don't have components that rely on the type of thread pools - for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.FORCE_MERGE, - ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { - if (random.nextBoolean()) { - final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); - builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type); - } - } - } if (random.nextInt(10) == 0) { // node gets an extra cpu this time