Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search reqeusts
// docs to become visible immediately. This also flushes all pending indexing / search requests
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
Expand Down Expand Up @@ -829,17 +829,20 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
}

abstract static class BaseAsyncTask extends AbstractAsyncTask {

protected final IndexService indexService;

BaseAsyncTask(IndexService indexService, TimeValue interval) {
BaseAsyncTask(final IndexService indexService, final TimeValue interval) {
super(indexService.logger, indexService.threadPool, interval, true);
this.indexService = indexService;
rescheduleIfNecessary();
}

@Override
protected boolean mustReschedule() {
// don't re-schedule if its closed or if we don't have a single shard here..., we are done
return indexService.closed.get() == false;
// don't re-schedule if the IndexService instance is closed or if the index is closed
return indexService.closed.get() == false
&& indexService.indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
Expand All @@ -47,6 +48,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.core.IsEqual.equalTo;

/** Unit test(s) for IndexService */
Expand Down Expand Up @@ -109,22 +111,49 @@ protected String getThreadPool() {
latch2.get().countDown();
assertEquals(2, count.get());


task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) {
@Override
protected void runInternal() {

}
};
assertTrue(task.mustReschedule());

// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
assertFalse(task.mustReschedule());
assertFalse(task.isClosed());
assertEquals(1000000, task.getInterval().millis());

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);

task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(100000)) {
@Override
protected void runInternal() {

}
};
assertTrue(task.mustReschedule());
assertFalse(task.isClosed());
assertTrue(task.isScheduled());

indexService.close("simon says", false);
assertFalse("no shards left", task.mustReschedule());
assertTrue(task.isScheduled());
task.close();
assertFalse(task.isScheduled());
}

public void testRefreshTaskIsUpdated() throws IOException {
public void testRefreshTaskIsUpdated() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask();
assertEquals(1000, refreshTask.getInterval().millis());
Expand Down Expand Up @@ -167,12 +196,35 @@ public void testRefreshTaskIsUpdated() throws IOException {
assertTrue(refreshTask.isScheduled());
assertFalse(refreshTask.isClosed());
assertEquals(200, refreshTask.getInterval().millis());

// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
assertNotSame(refreshTask, closedIndexService.getRefreshTask());
assertFalse(closedIndexService.getRefreshTask().mustReschedule());
assertFalse(closedIndexService.getRefreshTask().isClosed());
assertEquals(200, closedIndexService.getRefreshTask().getInterval().millis());

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
refreshTask = indexService.getRefreshTask();
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(refreshTask.isScheduled());
assertFalse(refreshTask.isClosed());

indexService.close("simon says", false);
assertFalse(refreshTask.isScheduled());
assertTrue(refreshTask.isClosed());
}

public void testFsyncTaskIsRunning() throws IOException {
public void testFsyncTaskIsRunning() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build();
IndexService indexService = createIndex("test", settings);
Expand All @@ -182,6 +234,28 @@ public void testFsyncTaskIsRunning() throws IOException {
assertTrue(fsyncTask.mustReschedule());
assertTrue(fsyncTask.isScheduled());

// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
assertNotSame(fsyncTask, closedIndexService.getFsyncTask());
assertFalse(closedIndexService.getFsyncTask().mustReschedule());
assertFalse(closedIndexService.getFsyncTask().isClosed());
assertEquals(5000, closedIndexService.getFsyncTask().getInterval().millis());

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
fsyncTask = indexService.getFsyncTask();
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(fsyncTask.isScheduled());
assertFalse(fsyncTask.isClosed());

indexService.close("simon says", false);
assertFalse(fsyncTask.isScheduled());
assertTrue(fsyncTask.isClosed());
Expand Down