Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private void registerBuiltinClusterSettings() {
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", Validator.EMPTY);
registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR);
registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER);
registerClusterDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, Validator.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
}
}

table.addCell(poolInfo == null ? null : poolInfo.getType());
table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType());
table.addCell(poolStats == null ? null : poolStats.getActive());
table.addCell(poolStats == null ? null : poolStats.getThreads());
table.addCell(poolStats == null ? null : poolStats.getQueue());
Expand Down
275 changes: 203 additions & 72 deletions core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put("threadpool.search.type", "fixed")
.put("threadpool.search.size", 1)
.put("threadpool.search.queue_size", 1)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,21 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.regex.Pattern;

import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.*;

/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class SimpleThreadPoolIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build();
return Settings.settingsBuilder().build();
}

public void testThreadNames() throws Exception {
Expand Down Expand Up @@ -130,59 +123,51 @@ public void testUpdatingThreadPoolSettings() throws Exception {
internalCluster().startNodesAsync(2).get();
ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);
// Check that settings are changed
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet();
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(1000));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 2000).build()).execute().actionGet();
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(2000));

// Make sure that threads continue executing when executor is replaced
final CyclicBarrier barrier = new CyclicBarrier(2);
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException ex) {
//
}
}
});
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
threadPool.executor(Names.SEARCH).execute(() -> {
try {
barrier.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException ex) {
//
}
});
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 1000).build()).execute().actionGet();
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false));
barrier.await(10, TimeUnit.SECONDS);

// Make sure that new thread executor is functional
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException ex) {
//
threadPool.executor(Names.SEARCH).execute(() -> {
try {
barrier.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException ex) {
//
}
}
}
});
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this to not be on its own line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 36e6975ba23cc50d5f66cbdd847233fe5d379705.

client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 500)).execute().actionGet();
barrier.await(10, TimeUnit.SECONDS);

// This was here: Thread.sleep(200);
// Why? What was it for?

// Check that node info is correct
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) {
NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i];
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("fixed"));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
found = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.threadpool;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -30,7 +31,9 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Map;

import static org.elasticsearch.common.settings.Settings.settingsBuilder;
Expand All @@ -44,9 +47,16 @@
*/
public class ThreadPoolSerializationTests extends ESTestCase {
BytesStreamOutput output = new BytesStreamOutput();
private ThreadPool.ThreadPoolType threadPoolType;

@Before
public void setUp() throws Exception {
super.setUp();
threadPoolType = randomFrom(ThreadPool.ThreadPoolType.values());
}

public void testThatQueueSizeSerializationWorks() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k"));
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k"));
output.setVersion(Version.CURRENT);
info.writeTo(output);

Expand All @@ -58,7 +68,7 @@ public void testThatQueueSizeSerializationWorks() throws Exception {
}

public void testThatNegativeQueueSizesCanBeSerialized() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null);
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null);
output.setVersion(Version.CURRENT);
info.writeTo(output);

Expand All @@ -70,7 +80,7 @@ public void testThatNegativeQueueSizesCanBeSerialized() throws Exception {
}

public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null);
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null);
XContentBuilder builder = jsonBuilder();
builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand All @@ -95,7 +105,7 @@ public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
}

public void testThatToXContentWritesInteger() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k"));
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k"));
XContentBuilder builder = jsonBuilder();
builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand All @@ -111,4 +121,16 @@ public void testThatToXContentWritesInteger() throws Exception {
assertThat(map, hasKey("queue_size"));
assertThat(map.get("queue_size").toString(), is("1000"));
}

public void testThatThreadPoolTypeIsSerializedCorrectly() throws IOException {
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType);
output.setVersion(Version.CURRENT);
info.writeTo(output);

StreamInput input = StreamInput.wrap(output.bytes());
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);

assertThat(newInfo.getThreadPoolType(), is(threadPoolType));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.elasticsearch.threadpool;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.*;

import static org.junit.Assert.*;

public class ThreadPoolTypeSettingsValidatorTests extends ESTestCase {
private Validator validator;

@Before
public void setUp() throws Exception {
super.setUp();
validator = ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR;
}

public void testValidThreadPoolTypeSettings() {
for (Map.Entry<String, ThreadPool.ThreadPoolType> entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) {
assertNull(validateSetting(validator, entry.getKey(), entry.getValue().getType()));
}
}

public void testInvalidThreadPoolTypeSettings() {
for (Map.Entry<String, ThreadPool.ThreadPoolType> entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) {
Set<ThreadPool.ThreadPoolType> set = new HashSet<>();
set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values()));
set.remove(entry.getValue());
ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
String expectedMessage = String.format(
Locale.ROOT,
"thread pool type for [%s] can only be updated to [%s] but was [%s]",
entry.getKey(),
entry.getValue().getType(),
invalidThreadPoolType.getType());
String message = validateSetting(validator, entry.getKey(), invalidThreadPoolType.getType());
assertNotNull(message);
assertEquals(expectedMessage, message);
}
}

public void testNonThreadPoolTypeSetting() {
String setting = ThreadPool.THREADPOOL_GROUP + randomAsciiOfLength(10) + "foo";
String value = randomAsciiOfLength(10);
assertNull(validator.validate(setting, value, ClusterState.PROTO));
}

private String validateSetting(Validator validator, String threadPoolName, String value) {
return validator.validate(ThreadPool.THREADPOOL_GROUP + threadPoolName + ".type", value, ClusterState.PROTO);
}
}
Loading