Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));

this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Netty4Transport(Settings settings, Version version, ThreadPool threadPool
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings);

// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ public void apply(Settings value, Settings current, Settings previous) {
Client.CLIENT_TYPE_SETTING_S,
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
EsExecutors.PROCESSORS_SETTING,
EsExecutors.NODE_PROCESSORS_SETTING,
ThreadContext.DEFAULT_HEADERS_SETTING,
Loggers.LOG_DEFAULT_LEVEL_SETTING,
Loggers.LOG_LEVEL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,15 @@ public static Setting<Integer> intSetting(String key, Setting<Integer> fallbackS
return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key), properties);
}

public static Setting<Integer> intSetting(
final String key,
final Setting<Integer> fallbackSetting,
final int minValue,
final int maxValue,
final Property... properties) {
return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, maxValue, key), properties);
}

public static Setting<Integer> intSetting(String key, Setting<Integer> fallbackSetting, int minValue, Validator<Integer> validator,
Property... properties) {
return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, (s) -> parseInt(s, minValue, key),validator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,23 @@

public class EsExecutors {

/**
* Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
*/
public static final Setting<Integer> PROCESSORS_SETTING = Setting.intSetting(
"processors",
Runtime.getRuntime().availableProcessors(),
1,
Runtime.getRuntime().availableProcessors(),
Property.Deprecated,
Property.NodeScope);

/**
* Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
*/
// TODO: when removing "processors" setting, the default value is Runtime.getRuntime().availableProcessors()
public static final Setting<Integer> NODE_PROCESSORS_SETTING = Setting.intSetting(
"node.processors",
PROCESSORS_SETTING,
1,
Runtime.getRuntime().availableProcessors(),
Property.NodeScope);

/**
Expand All @@ -65,7 +74,7 @@ public class EsExecutors {
* @return the number of available processors
*/
public static int numberOfProcessors(final Settings settings) {
return PROCESSORS_SETTING.get(settings);
return NODE_PROCESSORS_SETTING.get(settings);
}

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte

public void testAllocatedProcessors() throws Exception {
// start one node with 7 processors.
internalCluster().startNode(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build());
internalCluster().startNode(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
waitForNodes(1);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matcher;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -389,15 +391,30 @@ public void testGetTasks() throws InterruptedException {
}
}

public void testNodeProcessorsBound() {
runProcessorsBoundTest(EsExecutors.NODE_PROCESSORS_SETTING);
}

public void testProcessorsBound() {
runProcessorsBoundTest(EsExecutors.PROCESSORS_SETTING);
}

private void runProcessorsBoundTest(final Setting<Integer> processorsSetting) {
final int available = Runtime.getRuntime().availableProcessors();
final int processors = randomIntBetween(available + 1, Integer.MAX_VALUE);
final Settings settings = Settings.builder().put("processors", processors).build();
final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build();
final IllegalArgumentException e =
expectThrows(IllegalArgumentException.class, () -> EsExecutors.PROCESSORS_SETTING.get(settings));
assertThat(
e,
hasToString(containsString("Failed to parse value [" + processors + "] for setting [processors] must be <= " + available)));
expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
final String expected = String.format(
Locale.ROOT,
"Failed to parse value [%d] for setting [%s] must be <= %d",
processors,
processorsSetting.getKey(),
available);
assertThat(e, hasToString(containsString(expected)));
if (processorsSetting.getProperties().contains(Setting.Property.Deprecated)) {
assertSettingDeprecationsAndWarnings(new Setting<?>[]{processorsSetting});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;

import static org.elasticsearch.common.util.concurrent.EsExecutors.PROCESSORS_SETTING;
import static org.elasticsearch.common.util.concurrent.EsExecutors.NODE_PROCESSORS_SETTING;
import static org.elasticsearch.index.IndexSettingsTests.newIndexMeta;
import static org.elasticsearch.index.MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING;
import static org.elasticsearch.index.MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING;
Expand Down Expand Up @@ -139,7 +139,7 @@ private static IndexMetaData createMetaData(int maxThreadCount, int maxMergeCoun
builder.put(MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount);
}
if (numProc != -1) {
builder.put(PROCESSORS_SETTING.getKey(), numProc);
builder.put(NODE_PROCESSORS_SETTING.getKey(), numProc);
}
return newIndexMeta("index", builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private Node startNode() throws NodeValidationException {
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put("transport.type", getTestTransportType())
.put(Node.NODE_DATA_SETTING.getKey(), true)
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void testNodesInfosTotalIndexingBuffer() throws Exception {

public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().startNodes(
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(),
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build()
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(),
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build()
);

final String node_1 = nodesIds.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
if (randomBoolean()) {
processors = randomIntBetween(1, availableProcessors);
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
builder.put("processors", processors);
builder.put("node.processors", processors);
} else {
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, availableProcessors);
processors = availableProcessors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private Node newNode() {
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put("transport.type", getTestTransportType())
.put(TransportSettings.PORT.getKey(), ESTestCase.getPortRange())
.put(Node.NODE_DATA_SETTING.getKey(), true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ private static Settings getRandomNodeSettings(long seed) {
builder.put(SearchService.DEFAULT_KEEPALIVE_SETTING.getKey(), timeValueSeconds(100 + random.nextInt(5 * 60)).getStringRep());
}

builder.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
builder.put(
EsExecutors.NODE_PROCESSORS_SETTING.getKey(),
1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
if (random.nextBoolean()) {
if (random.nextBoolean()) {
builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);
Expand Down