Skip to content

Commit 00d6170

Browse files
committed
Add node.processors setting in favor of processors (elastic#45855)
This commit namespaces the existing processors setting under the "node" namespace. In doing so, we deprecate the existing processors setting in favor of node.processors.
1 parent 3393f95 commit 00d6170

File tree

12 files changed

+67
-18
lines changed

12 files changed

+67
-18
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
152152
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
153153
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
154154
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
155-
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
155+
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
156156

157157
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
158158
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public Netty4Transport(Settings settings, Version version, ThreadPool threadPool
112112
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
113113
CircuitBreakerService circuitBreakerService) {
114114
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
115-
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
115+
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
116116
this.workerCount = WORKER_COUNT.get(settings);
117117

118118
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ public void apply(Settings value, Settings current, Settings previous) {
447447
Client.CLIENT_TYPE_SETTING_S,
448448
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
449449
EsExecutors.PROCESSORS_SETTING,
450+
EsExecutors.NODE_PROCESSORS_SETTING,
450451
ThreadContext.DEFAULT_HEADERS_SETTING,
451452
Loggers.LOG_DEFAULT_LEVEL_SETTING,
452453
Loggers.LOG_LEVEL_SETTING,

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.concurrent.ThreadPoolExecutor;
4545
import java.util.concurrent.TimeUnit;
4646
import java.util.concurrent.atomic.AtomicInteger;
47+
import java.util.function.Function;
4748
import java.util.stream.Collectors;
4849

4950
public class EsExecutors {
@@ -56,19 +57,33 @@ public class EsExecutors {
5657
public static final Setting<Integer> PROCESSORS_SETTING = new Setting<>(
5758
"processors",
5859
s -> Integer.toString(Runtime.getRuntime().availableProcessors()),
59-
s -> {
60-
final int value = Setting.parseInt(s, 1, "processors");
60+
processorsParser("processors"),
61+
Property.Deprecated,
62+
Property.NodeScope);
63+
64+
/**
65+
* Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
66+
*/
67+
public static final Setting<Integer> NODE_PROCESSORS_SETTING = new Setting<>(
68+
"node.processors",
69+
PROCESSORS_SETTING,
70+
processorsParser("node.processors"),
71+
Property.NodeScope);
72+
73+
private static Function<String, Integer> processorsParser(final String name) {
74+
return s -> {
75+
final int value = Setting.parseInt(s, 1, name);
6176
final int availableProcessors = Runtime.getRuntime().availableProcessors();
6277
if (value > availableProcessors) {
6378
deprecationLogger.deprecatedAndMaybeLog(
6479
"processors",
65-
"setting processors to value [{}] which is more than available processors [{}] is deprecated",
80+
"setting [" + name + "] to value [{}] which is more than available processors [{}] is deprecated",
6681
value,
6782
availableProcessors);
6883
}
6984
return value;
70-
},
71-
Property.NodeScope);
85+
};
86+
}
7287

7388
/**
7489
* Returns the number of available processors. Defaults to
@@ -79,7 +94,7 @@ public class EsExecutors {
7994
* @return the number of available processors
8095
*/
8196
public static int numberOfProcessors(final Settings settings) {
82-
return PROCESSORS_SETTING.get(settings);
97+
return NODE_PROCESSORS_SETTING.get(settings);
8398
}
8499

85100
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory,

server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
203203

204204
public void testAllocatedProcessors() throws Exception {
205205
// start one node with 7 processors.
206-
internalCluster().startNode(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build());
206+
internalCluster().startNode(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
207207
waitForNodes(1);
208208

209209
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();

server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
package org.elasticsearch.common.util.concurrent;
2121

22+
import org.elasticsearch.common.settings.Setting;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.test.ESTestCase;
2425
import org.hamcrest.Matcher;
2526

27+
import java.util.Locale;
2628
import java.util.concurrent.CountDownLatch;
2729
import java.util.concurrent.CyclicBarrier;
2830
import java.util.concurrent.ThreadPoolExecutor;
@@ -32,6 +34,7 @@
3234
import static org.hamcrest.Matchers.anyOf;
3335
import static org.hamcrest.Matchers.containsString;
3436
import static org.hamcrest.Matchers.equalTo;
37+
import static org.hamcrest.Matchers.hasToString;
3538
import static org.hamcrest.Matchers.lessThan;
3639

3740
/**
@@ -388,4 +391,32 @@ public void testGetTasks() throws InterruptedException {
388391
}
389392
}
390393

394+
public void testNodeProcessorsBound() {
395+
runProcessorsBoundTest(EsExecutors.NODE_PROCESSORS_SETTING);
396+
}
397+
398+
public void testProcessorsBound() {
399+
runProcessorsBoundTest(EsExecutors.PROCESSORS_SETTING);
400+
}
401+
402+
private void runProcessorsBoundTest(final Setting<Integer> processorsSetting) {
403+
final int available = Runtime.getRuntime().availableProcessors();
404+
final int processors = randomIntBetween(available + 1, Integer.MAX_VALUE);
405+
final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build();
406+
processorsSetting.get(settings);
407+
final Setting<?>[] deprecatedSettings;
408+
if (processorsSetting.getProperties().contains(Setting.Property.Deprecated)) {
409+
deprecatedSettings = new Setting<?>[]{processorsSetting};
410+
} else {
411+
deprecatedSettings = new Setting<?>[0];
412+
}
413+
final String expectedWarning = String.format(
414+
Locale.ROOT,
415+
"setting [%s] to value [%d] which is more than available processors [%d] is deprecated",
416+
processorsSetting.getKey(),
417+
processors,
418+
available);
419+
assertSettingDeprecationsAndWarnings(deprecatedSettings, expectedWarning);
420+
}
421+
391422
}

server/src/test/java/org/elasticsearch/index/MergeSchedulerSettingsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.test.ESTestCase;
3333

34-
import static org.elasticsearch.common.util.concurrent.EsExecutors.PROCESSORS_SETTING;
34+
import static org.elasticsearch.common.util.concurrent.EsExecutors.NODE_PROCESSORS_SETTING;
3535
import static org.elasticsearch.index.IndexSettingsTests.newIndexMeta;
3636
import static org.elasticsearch.index.MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING;
3737
import static org.elasticsearch.index.MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING;
@@ -139,7 +139,7 @@ private static IndexMetaData createMetaData(int maxThreadCount, int maxMergeCoun
139139
builder.put(MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount);
140140
}
141141
if (numProc != -1) {
142-
builder.put(PROCESSORS_SETTING.getKey(), numProc);
142+
builder.put(NODE_PROCESSORS_SETTING.getKey(), numProc);
143143
}
144144
return newIndexMeta("index", builder.build());
145145
}

server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private Node startNode() throws NodeValidationException {
7070
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
7171
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
7272
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
73-
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
73+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
7474
.put("transport.type", getTestTransportType())
7575
.put(Node.NODE_DATA_SETTING.getKey(), true)
7676
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())

server/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ public void testNodesInfosTotalIndexingBuffer() throws Exception {
114114

115115
public void testAllocatedProcessors() throws Exception {
116116
List<String> nodesIds = internalCluster().startNodes(
117-
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(),
118-
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build()
117+
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(),
118+
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build()
119119
);
120120

121121
final String node_1 = nodesIds.get(0);

server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
5454
if (randomBoolean()) {
5555
final int processors = randomIntBetween(1, 64);
5656
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
57-
builder.put("processors", processors);
57+
builder.put("node.processors", processors);
5858
processorsUsed = processors;
5959
} else {
6060
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, availableProcessors);
@@ -99,7 +99,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
9999
});
100100

101101
if (processorsUsed > availableProcessors) {
102-
assertWarnings("setting processors to value [" + processorsUsed +
102+
assertWarnings("setting node.processors to value [" + processorsUsed +
103103
"] which is more than available processors [" + availableProcessors + "] is deprecated");
104104
}
105105
}

0 commit comments

Comments
 (0)