Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final List<SearchOperationListener> searchOperationListeners;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";

private final AsyncTrimTranslogTask trimTranslogTask;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
private final ScriptService scriptService;
Expand Down Expand Up @@ -177,6 +182,7 @@ public IndexService(
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -629,7 +635,6 @@ private void rescheduleRefreshTasks() {
} finally {
refreshTask = new AsyncRefreshTask(this);
}

}

public interface ShardStoreDeleter {
Expand Down Expand Up @@ -693,6 +698,28 @@ private void maybeRefreshEngine() {
}
}

private void maybeTrimTranslog() {
for (IndexShard shard : this.shards.values()) {
switch (shard.state()) {
case CREATED:
case RECOVERING:
case CLOSED:
continue;
case POST_RECOVERY:
case STARTED:
case RELOCATED:
try {
shard.trimTranslog();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
continue;
default:
throw new IllegalStateException("unknown state: " + shard.state());
}
}
}

abstract static class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -837,6 +864,29 @@ public String toString() {
}
}

final class AsyncTrimTranslogTask extends BaseAsyncTask {

AsyncTrimTranslogTask(IndexService indexService) {
super(indexService, indexService.getIndexSettings()
.getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)));
}

@Override
protected void runInternal() {
indexService.maybeTrimTranslog();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "trim_translog";
}
}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,12 +803,18 @@ public final boolean refreshNeeded() {
*/
public abstract CommitId flush() throws EngineException;


/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimTranslog() throws EngineException;

/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;


/**
* Force merges to 1 segment
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,24 @@ public void rollTranslogGeneration() throws EngineException {
}
}

@Override
public void trimTranslog() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
}
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,16 @@ public Engine.CommitId flush(FlushRequest request) {
return commitId;
}

/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public void trimTranslog() {
verifyNotClosed();
final Engine engine = getEngine();
engine.trimTranslog();
}

/**
* Rolls the tranlog generation and cleans unneeded.
*/
Expand Down
35 changes: 32 additions & 3 deletions core/src/test/java/org/elasticsearch/index/IndexServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -34,19 +33,29 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.hamcrest.core.IsEqual.equalTo;

/** Unit test(s) for IndexService */
public class IndexServiceTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(InternalSettingsPlugin.class);
}

public static CompressedXContent filter(QueryBuilder filterBuilder) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
filterBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down Expand Up @@ -263,6 +272,26 @@ public void testRescheduleAsyncFsync() throws Exception {
assertNotNull(indexService.getFsyncTask());
}

public void testAsyncTranslogTrimActuallyWorks() throws Exception {
Settings settings = Settings.builder()
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") // very often :)
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertTrue(indexService.getRefreshTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
client().admin().indices().prepareFlush("test").get();
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder()
.put(indexService.getMetaData().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1))
.build();
indexService.updateMetaData(metaData);

IndexShard shard = indexService.getShard(0);
assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0)));
}

public void testIllegalFsyncInterval() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public final class InternalSettingsPlugin extends Plugin {

Expand All @@ -36,10 +38,13 @@ public final class InternalSettingsPlugin extends Plugin {
Setting.boolSetting("index.merge.enabled", true, Property.IndexScope, Property.NodeScope);
public static final Setting<Long> INDEX_CREATION_DATE_SETTING =
Setting.longSetting(IndexMetaData.SETTING_CREATION_DATE, -1, -1, Property.IndexScope, Property.NodeScope);
public static final Setting<TimeValue> TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING =
Setting.timeSetting("index.translog.retention.check_interval", new TimeValue(10, TimeUnit.MINUTES),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(VERSION_CREATED, MERGE_ENABLED,
INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING);
INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING);
}
}