Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class EngineConfig {
private final Sort indexSort;
private final boolean forceNewHistoryUUID;
private final TranslogRecoveryRunner translogRecoveryRunner;
private final RecoveryConfig recoveryConfig;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -118,7 +120,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> refreshListeners, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner) {
TranslogRecoveryRunner translogRecoveryRunner, RecoveryConfig recoveryConfig) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand Down Expand Up @@ -147,6 +149,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.refreshListeners = refreshListeners;
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.recoveryConfig = recoveryConfig;
}

/**
Expand Down Expand Up @@ -358,4 +361,41 @@ public boolean isAutoGeneratedIDsOptimizationEnabled() {
public Sort getIndexSort() {
return indexSort;
}

/**
* This configuration is used in conjunction with {@link OpenMode} to specific how the engine should be started.
*/
public RecoveryConfig getRecoveryConfig() {
return recoveryConfig;
}

public static class RecoveryConfig {
private final IndexCommit startingCommit;
private final long maxRecoveringSeqNo;

// Starts an engine with the last commit and recover all possible translog operations.
public static final RecoveryConfig MOST_RECENT = new RecoveryConfig(null, Long.MAX_VALUE);

public RecoveryConfig(IndexCommit startingCommit, long maxRecoveringSeqNo) {
this.startingCommit = startingCommit;
this.maxRecoveringSeqNo = maxRecoveringSeqNo;
}

/**
* Returns a starting index commit that an {@link org.apache.lucene.index.IndexWriter} should open with.
* See {@link org.apache.lucene.index.IndexWriterConfig#setIndexCommit(IndexCommit)}
*/
public IndexCommit getStartingCommit() {
return startingCommit;
}

/**
* Returns the maximum sequence number that an engine should recover from the local translog.
* Translog operations with a higher sequence number will be skipped.
* See {@link Engine#recoverFromTranslog()}
*/
public long getMaxRecoveringSeqNo() {
return maxRecoveringSeqNo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -187,11 +188,12 @@ public InternalEngine(EngineConfig engineConfig) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
seqNoStats = store.loadSeqNoStatsFromCommit(globalCheckpoint, engineConfig.getRecoveryConfig().getStartingCommit());
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
seqNoStats = store.loadSeqNoStatsFromCommit(SequenceNumbers.UNASSIGNED_SEQ_NO,
engineConfig.getRecoveryConfig().getStartingCommit());
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
Expand Down Expand Up @@ -405,11 +407,22 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

private SegmentInfos recoveringSegmentInfo() {
try {
final IndexCommit recoveringCommit = config().getRecoveryConfig().getStartingCommit();
return store.readCommittedSegmentsInfo(recoveringCommit);
} catch (IOException ex) {
throw new EngineCreationFailureException(shardId, "Failed to read SegmentInfos for the recovering commit", ex);
}
}

private void recoverFromTranslogInternal() throws IOException {
final SegmentInfos recoveringSegmentInfo = recoveringSegmentInfo();
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
final long translogGen = Long.parseLong(recoveringSegmentInfo.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final long maxRecoveringSeqNo = config().getRecoveryConfig().getMaxRecoveringSeqNo();
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen).filter(op -> op.seqNo() <= maxRecoveringSeqNo)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
Expand All @@ -424,12 +437,12 @@ private void recoverFromTranslogInternal() throws IOException {
flush(true, true);
refresh("translog_recovery");
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
commitIndexWriter(indexWriter, translog, recoveringSegmentInfo.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
} else if (recoveringSegmentInfo.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
assert historyUUID != null;
// put the history uuid into the index
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
commitIndexWriter(indexWriter, translog, recoveringSegmentInfo.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
Expand Down Expand Up @@ -1841,6 +1854,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create) {
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
iwc.setIndexCommit(engineConfig.getRecoveryConfig().getStartingCommit());
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -151,7 +150,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1254,11 +1252,12 @@ public void performTranslogRecovery(boolean indexExists) throws IOException {
translogStats.totalOperations(0);
translogStats.totalOperationsOnStart(0);
}
internalPerformTranslogRecovery(false, indexExists);
internalPerformTranslogRecovery(false, indexExists, EngineConfig.RecoveryConfig.MOST_RECENT);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists,
EngineConfig.RecoveryConfig recoveryConfig) throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand Down Expand Up @@ -1290,7 +1289,7 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole

assert indexExists == false || assertMaxUnsafeAutoIdInCommit();

final EngineConfig config = newEngineConfig(openMode);
final EngineConfig config = newEngineConfig(openMode, recoveryConfig);
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
Expand Down Expand Up @@ -1329,7 +1328,7 @@ protected void onNewEngine(Engine newEngine) {
*/
public void skipTranslogRecovery() throws IOException {
assert getEngineOrNull() == null : "engine was already created";
internalPerformTranslogRecovery(true, true);
internalPerformTranslogRecovery(true, true, EngineConfig.RecoveryConfig.MOST_RECENT);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}

Expand Down Expand Up @@ -2129,7 +2128,7 @@ private DocumentMapperForType docMapper(String type) {
return mapperService.documentMapperWithAutoCreate(type);
}

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, EngineConfig.RecoveryConfig recoveryConfig) {
Sort indexSort = indexSortSupplier.get();
final boolean forceNewHistoryUUID;
switch (shardRouting.recoverySource().getType()) {
Expand All @@ -2151,7 +2150,7 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
this::runTranslogRecovery);
this::runTranslogRecovery, recoveryConfig);
}

/**
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,16 @@ public Directory directory() {
* @throws IOException if the index is corrupted or the segments file is not present
*/
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
return readCommittedSegmentsInfo(null);
}

/**
* Returns the segments info for a given commit or for the latest commit if the given commit is null.
*/
public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException {
failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
return readSegmentsInfo(commit, directory());
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
throw ex;
Expand Down Expand Up @@ -219,11 +226,28 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
* @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SeqNoStats loadSeqNoStats(final long globalCheckpoint) throws IOException {
public SeqNoStats loadSeqNoStatsFromLatestCommit(final long globalCheckpoint) throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, userData.entrySet());
}

/**
* Loads the local checkpoint and the maximum sequence number from the given Lucene commit point and returns the triplet of local and
* global checkpoints, and maximum sequence number as an instance of {@link SeqNoStats}. The global checkpoint must be provided
* externally as it is not stored in the commit point.
*
* @param globalCheckpoint the provided global checkpoint
* @param commit if the provided commit is null, the latest commit in the store will be used.
* @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SeqNoStats loadSeqNoStatsFromCommit(final long globalCheckpoint, final IndexCommit commit) throws IOException {
if (commit == null) {
return loadSeqNoStatsFromLatestCommit(globalCheckpoint);
}
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, commit.getUserData().entrySet());
}

final void ensureOpen() {
if (this.refCounter.refCount() <= 0) {
throw new AlreadyClosedException("store is already closed");
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -840,6 +841,34 @@ public interface Snapshot extends Closeable {
*/
Translog.Operation next() throws IOException;

/**
* Returns a snapshot consisting of the elements of this snapshot that match the given predicate.
*/
default Snapshot filter(final Predicate<Translog.Operation> predicate) {
final Snapshot originalSnapshot = this;
return new Snapshot() {
@Override
public int totalOperations() {
return originalSnapshot.totalOperations();
}

@Override
public Operation next() throws IOException {
Translog.Operation op;
while ((op = originalSnapshot.next()) != null) {
if (predicate.test(op)) {
return op;
}
}
return null;
}

@Override
public void close() throws IOException {
originalSnapshot.close();
}
};
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStatsFromLatestCommit(globalCheckpoint);
if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
assert seqNoStats.getLocalCheckpoint() <= seqNoStats.getGlobalCheckpoint();
/*
Expand Down
Loading