Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public Collection<ExecutorBuilder> builders() {
}

public static Setting<TimeValue> ESTIMATED_TIME_INTERVAL_SETTING =
Setting.timeSetting("thread_pool.estimated_time_interval", TimeValue.timeValueMillis(200), Setting.Property.NodeScope);
Setting.timeSetting("thread_pool.estimated_time_interval",
TimeValue.timeValueMillis(200), TimeValue.ZERO, Setting.Property.NodeScope);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
assert Node.NODE_NAME_SETTING.exists(settings);
Expand Down Expand Up @@ -555,22 +556,36 @@ static class CachedTimeThread extends Thread {
/**
* Return the current time used for relative calculations. This is
* {@link System#nanoTime()} truncated to milliseconds.
* <p>
* If {@link ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING} is set to 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING a setting that has 0 as an inclusive lower bound? I am fine with that in a follow up.

* then the cache is disabled and the method calls {@link System#nanoTime()}
* whenever called. Typically used for testing.
*/
long relativeTimeInMillis() {
return relativeMillis;
if (0 < interval) {
return relativeMillis;
}
return TimeValue.nsecToMSec(System.nanoTime());
}

/**
* Return the current epoch time, used to find absolute time. This is
* a cached version of {@link System#currentTimeMillis()}.
* <p>
* If {@link ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING} is set to 0
* then the cache is disabled and the method calls {@link System#currentTimeMillis()}
* whenever called. Typically used for testing.
*/
long absoluteTimeInMillis() {
return absoluteMillis;
if (0 < interval) {
return absoluteMillis;
}
return System.currentTimeMillis();
}

@Override
public void run() {
while (running) {
while (running && 0 < interval) {
relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
absoluteMillis = System.currentTimeMillis();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -47,6 +48,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;

public class UpdateSettingsIT extends ESIntegTestCase {
Expand Down Expand Up @@ -126,6 +128,16 @@ public List<Setting<?>> getSettings() {
}
}

/**
* Needed by {@link UpdateSettingsIT#testEngineGCDeletesSetting()}
*/
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.estimated_time_interval", 0)
.build();
}

public void testUpdateDependentClusterSettings() {
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
Expand Down Expand Up @@ -435,23 +447,28 @@ public void testOpenCloseUpdateSettings() throws Exception {
assertThat(getSettingsResponse.getSetting("test", "index.final"), nullValue());
}

public void testEngineGCDeletesSetting() throws InterruptedException {
public void testEngineGCDeletesSetting() throws Exception {
createIndex("test");
client().prepareIndex("test", "type", "1").setSource("f", 1).get();
DeleteResponse response = client().prepareDelete("test", "type", "1").get();
long seqNo = response.getSeqNo();
long primaryTerm = response.getPrimaryTerm();
// delete is still in cache this should work
client().prepareIndex("test", "type", "1").setSource("f", 2).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.gc_deletes", 0)).get();
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.gc_deletes", 0)));

response = client().prepareDelete("test", "type", "1").get();
seqNo = response.getSeqNo();
Thread.sleep(300); // wait for cache time to change TODO: this needs to be solved better. To be discussed.

// Make sure the time has advanced for InternalEngine#resolveDocVersion()
for (ThreadPool threadPool : internalCluster().getInstances(ThreadPool.class)) {
long startTime = threadPool.relativeTimeInMillis();
assertBusy(() -> assertThat(threadPool.relativeTimeInMillis(), greaterThan(startTime)));
}

// delete is should not be in cache
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn’t it be enough to assert busy and remove the sleep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so, this solution just makes more visible of what's happening and less "brute force".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the test is to make sure that once time moves the delete is forgotten. If we busy spin on the indexing request (instead of on time - which is what I think Jason refers to with sleep), we will have different semantics as some indexing ops may succeed, changing the dynamics of the test (it now will check that a CASed index operation fails if it's base is an index op, rather than a delete op).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah. Didn't even think that the assertBusy would potentially execute the prepareIndex request multiple times.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes Sorry to be unclear. I meant busy spin waiting for the cached time to advance. So instead of sleeping for it to happen, assert that it has happened, busily since it happens in the background.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VersionConflictEngineException.class);

}

public void testUpdateSettingsWithBlocks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;

import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
import static org.hamcrest.CoreMatchers.equalTo;

public class ThreadPoolTests extends ESTestCase {
Expand Down Expand Up @@ -59,4 +61,10 @@ public void testAbsoluteTime() throws Exception {
threadPool.close();
}
}

public void testEstimatedTimeIntervalSettingAcceptsOnlyZeroAndPositiveTime() {
Settings settings = Settings.builder().put("thread_pool.estimated_time_interval", -1).build();
Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings));
assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage());
}
}