diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index fbc87f2279b3d..33ce83bd5f797 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -73,6 +74,7 @@ public final class EngineConfig { private final Sort indexSort; private final boolean forceNewHistoryUUID; private final TranslogRecoveryRunner translogRecoveryRunner; + private final RecoveryConfig recoveryConfig; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -118,7 +120,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, Sort indexSort, - TranslogRecoveryRunner translogRecoveryRunner) { + TranslogRecoveryRunner translogRecoveryRunner, RecoveryConfig recoveryConfig) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -147,6 +149,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr this.refreshListeners = refreshListeners; this.indexSort = indexSort; this.translogRecoveryRunner = translogRecoveryRunner; + this.recoveryConfig = recoveryConfig; } /** @@ -358,4 +361,41 @@ public boolean isAutoGeneratedIDsOptimizationEnabled() { public Sort getIndexSort() { return indexSort; } + + /** + * This configuration is used in conjunction with {@link OpenMode} to specific how the engine should be started. + */ + public RecoveryConfig getRecoveryConfig() { + return recoveryConfig; + } + + public static class RecoveryConfig { + private final IndexCommit startingCommit; + private final long maxRecoveringSeqNo; + + // Starts an engine with the last commit and recover all possible translog operations. + public static final RecoveryConfig MOST_RECENT = new RecoveryConfig(null, Long.MAX_VALUE); + + public RecoveryConfig(IndexCommit startingCommit, long maxRecoveringSeqNo) { + this.startingCommit = startingCommit; + this.maxRecoveringSeqNo = maxRecoveringSeqNo; + } + + /** + * Returns a starting index commit that an {@link org.apache.lucene.index.IndexWriter} should open with. + * See {@link org.apache.lucene.index.IndexWriterConfig#setIndexCommit(IndexCommit)} + */ + public IndexCommit getStartingCommit() { + return startingCommit; + } + + /** + * Returns the maximum sequence number that an engine should recover from the local translog. + * Translog operations with a higher sequence number will be skipped. + * See {@link Engine#recoverFromTranslog()} + */ + public long getMaxRecoveringSeqNo() { + return maxRecoveringSeqNo; + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6449c979de40a..cb80e60ad4f3a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; @@ -187,11 +188,12 @@ public InternalEngine(EngineConfig engineConfig) { case OPEN_INDEX_AND_TRANSLOG: writer = createWriter(false); final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); - seqNoStats = store.loadSeqNoStats(globalCheckpoint); + seqNoStats = store.loadSeqNoStatsFromCommit(globalCheckpoint, engineConfig.getRecoveryConfig().getStartingCommit()); break; case OPEN_INDEX_CREATE_TRANSLOG: writer = createWriter(false); - seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); + seqNoStats = store.loadSeqNoStatsFromCommit(SequenceNumbers.UNASSIGNED_SEQ_NO, + engineConfig.getRecoveryConfig().getStartingCommit()); break; case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); @@ -405,11 +407,22 @@ public InternalEngine recoverFromTranslog() throws IOException { return this; } + private SegmentInfos recoveringSegmentInfo() { + try { + final IndexCommit recoveringCommit = config().getRecoveryConfig().getStartingCommit(); + return store.readCommittedSegmentsInfo(recoveringCommit); + } catch (IOException ex) { + throw new EngineCreationFailureException(shardId, "Failed to read SegmentInfos for the recovering commit", ex); + } + } + private void recoverFromTranslogInternal() throws IOException { + final SegmentInfos recoveringSegmentInfo = recoveringSegmentInfo(); Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; - final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) { + final long translogGen = Long.parseLong(recoveringSegmentInfo.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + final long maxRecoveringSeqNo = config().getRecoveryConfig().getMaxRecoveringSeqNo(); + try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen).filter(op -> op.seqNo() <= maxRecoveringSeqNo)) { opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); @@ -424,12 +437,12 @@ private void recoverFromTranslogInternal() throws IOException { flush(true, true); refresh("translog_recovery"); } else if (translog.isCurrent(translogGeneration) == false) { - commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); + commitIndexWriter(indexWriter, translog, recoveringSegmentInfo.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); - } else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) { + } else if (recoveringSegmentInfo.getUserData().containsKey(HISTORY_UUID_KEY) == false) { assert historyUUID != null; // put the history uuid into the index - commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); + commitIndexWriter(indexWriter, translog, recoveringSegmentInfo.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); } // clean up what's not needed @@ -1841,6 +1854,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create) { iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); iwc.setCodec(engineConfig.getCodec()); iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh + iwc.setIndexCommit(engineConfig.getRecoveryConfig().getStartingCommit()); if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 304764656b73f..ea3bc66630a29 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -58,7 +58,6 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -151,7 +150,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -1254,11 +1252,12 @@ public void performTranslogRecovery(boolean indexExists) throws IOException { translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0); } - internalPerformTranslogRecovery(false, indexExists); + internalPerformTranslogRecovery(false, indexExists, EngineConfig.RecoveryConfig.MOST_RECENT); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { + private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, + EngineConfig.RecoveryConfig recoveryConfig) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1290,7 +1289,7 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole assert indexExists == false || assertMaxUnsafeAutoIdInCommit(); - final EngineConfig config = newEngineConfig(openMode); + final EngineConfig config = newEngineConfig(openMode, recoveryConfig); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); @@ -1329,7 +1328,7 @@ protected void onNewEngine(Engine newEngine) { */ public void skipTranslogRecovery() throws IOException { assert getEngineOrNull() == null : "engine was already created"; - internalPerformTranslogRecovery(true, true); + internalPerformTranslogRecovery(true, true, EngineConfig.RecoveryConfig.MOST_RECENT); assert recoveryState.getTranslog().recoveredOperations() == 0; } @@ -2129,7 +2128,7 @@ private DocumentMapperForType docMapper(String type) { return mapperService.documentMapperWithAutoCreate(type); } - private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { + private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, EngineConfig.RecoveryConfig recoveryConfig) { Sort indexSort = indexSortSupplier.get(); final boolean forceNewHistoryUUID; switch (shardRouting.recoverySource().getType()) { @@ -2151,7 +2150,7 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, - this::runTranslogRecovery); + this::runTranslogRecovery, recoveryConfig); } /** diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index fa992e12ef220..1cd0bc961ea63 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -181,9 +181,16 @@ public Directory directory() { * @throws IOException if the index is corrupted or the segments file is not present */ public SegmentInfos readLastCommittedSegmentsInfo() throws IOException { + return readCommittedSegmentsInfo(null); + } + + /** + * Returns the segments info for a given commit or for the latest commit if the given commit is null. + */ + public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException { failIfCorrupted(); try { - return readSegmentsInfo(null, directory()); + return readSegmentsInfo(commit, directory()); } catch (CorruptIndexException ex) { markStoreCorrupted(ex); throw ex; @@ -219,11 +226,28 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc * @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number * @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk */ - public SeqNoStats loadSeqNoStats(final long globalCheckpoint) throws IOException { + public SeqNoStats loadSeqNoStatsFromLatestCommit(final long globalCheckpoint) throws IOException { final Map userData = SegmentInfos.readLatestCommit(directory).getUserData(); return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, userData.entrySet()); } + /** + * Loads the local checkpoint and the maximum sequence number from the given Lucene commit point and returns the triplet of local and + * global checkpoints, and maximum sequence number as an instance of {@link SeqNoStats}. The global checkpoint must be provided + * externally as it is not stored in the commit point. + * + * @param globalCheckpoint the provided global checkpoint + * @param commit if the provided commit is null, the latest commit in the store will be used. + * @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number + * @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk + */ + public SeqNoStats loadSeqNoStatsFromCommit(final long globalCheckpoint, final IndexCommit commit) throws IOException { + if (commit == null) { + return loadSeqNoStatsFromLatestCommit(globalCheckpoint); + } + return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, commit.getUserData().entrySet()); + } + final void ensureOpen() { if (this.refCounter.refCount() <= 0) { throw new AlreadyClosedException("store is already closed"); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4373c8d05398b..77a530445cd9a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -65,6 +65,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongSupplier; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -840,6 +841,34 @@ public interface Snapshot extends Closeable { */ Translog.Operation next() throws IOException; + /** + * Returns a snapshot consisting of the elements of this snapshot that match the given predicate. + */ + default Snapshot filter(final Predicate predicate) { + final Snapshot originalSnapshot = this; + return new Snapshot() { + @Override + public int totalOperations() { + return originalSnapshot.totalOperations(); + } + + @Override + public Operation next() throws IOException { + Translog.Operation op; + while ((op = originalSnapshot.next()) != null) { + if (predicate.test(op)) { + return op; + } + } + return null; + } + + @Override + public void close() throws IOException { + originalSnapshot.close(); + } + }; + } } /** diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 65a8a0d0f6e0b..5aa3585107942 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -354,7 +354,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) { try { final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation()); - final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint); + final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStatsFromLatestCommit(globalCheckpoint); if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) { assert seqNoStats.getLocalCheckpoint() <= seqNoStats.getGlobalCheckpoint(); /* diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index eb0d4b8afa218..d247688701d8b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -105,7 +106,6 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -160,6 +160,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; @@ -2483,7 +2484,7 @@ public void testRecoverFromForeignTranslog() throws IOException { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), EngineConfig.RecoveryConfig.MOST_RECENT); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2537,7 +2538,7 @@ public void testHistoryUUIDIsSetIfMissing() throws IOException { threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), EngineConfig.RecoveryConfig.MOST_RECENT); engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); @@ -2567,7 +2568,7 @@ public void testHistoryUUIDCanBeForced() throws IOException { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), EngineConfig.RecoveryConfig.MOST_RECENT); engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); @@ -4025,4 +4026,106 @@ public void testSeqNoGenerator() throws IOException { } } + static class KeepAllIndexCommit extends IndexDeletionPolicy { + @Override + public void onInit(List commits) throws IOException { + + } + + @Override + public void onCommit(List commits) throws IOException { + + } + } + + static class KeepAllIndexWriterFactory implements IndexWriterFactory { + @Override + public IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + iwc.setIndexDeletionPolicy(new KeepAllIndexCommit()); + return new IndexWriter(directory, iwc); + } + } + + public void testRecoveringFromStartingCommit() throws Exception { + IOUtils.close(engine, store); + final Path translogPath = createTempDir(); + try (Store store = createStore()) { + final int initDocs = scaledRandomIntBetween(10, 1000); + try (Engine engine = + openEngine(store, translogPath, new KeepAllIndexWriterFactory(), EngineConfig.RecoveryConfig.MOST_RECENT)) { + addDocs(engine, 0, initDocs); + } + final List commits = DirectoryReader.listCommits(store.directory()); + assertThat(commits, not(empty())); + + try (Engine engine = openEngine(store, translogPath, + new KeepAllIndexWriterFactory(), new EngineConfig.RecoveryConfig(randomFrom(commits), Long.MAX_VALUE))) { + try (Searcher searcher = engine.acquireSearcher("test")) { + assertThat(searcher.reader().numDocs(), equalTo(initDocs)); + } + final int moreDocs = scaledRandomIntBetween(10, 1000); + addDocs(engine, initDocs, moreDocs); + engine.refresh("test"); + try (Searcher searcher = engine.acquireSearcher("test")) { + assertThat(searcher.reader().numDocs(), equalTo(initDocs + moreDocs)); + } + } + } + } + + public void testRecoveringUpToMaxSeqNo() throws Exception { + IOUtils.close(engine, store); + final Path translogPath = createTempDir(); + try (Store store = createStore()) { + final int initDocs = scaledRandomIntBetween(10, 1000); + final int moreDocs = scaledRandomIntBetween(10, 1000); + final int extraDocs = scaledRandomIntBetween(10, 1000); + final List initCommits; + try (Engine engine = + openEngine(store, translogPath, new KeepAllIndexWriterFactory(), EngineConfig.RecoveryConfig.MOST_RECENT)) { + addDocs(engine, 0, initDocs); + initCommits = DirectoryReader.listCommits(store.directory()); + assertThat(initCommits, not(empty())); + addDocs(engine, initDocs, moreDocs); + } + final int maxRecoveringSeqNo = between(initDocs, initDocs + moreDocs - 1); + try (Engine engine = openEngine(store, translogPath, new KeepAllIndexWriterFactory(), + new EngineConfig.RecoveryConfig(randomFrom(initCommits), maxRecoveringSeqNo - 1))) { + try (Searcher searcher = engine.acquireSearcher("test")) { + assertThat(searcher.reader().numDocs(), equalTo(maxRecoveringSeqNo)); + } + addDocs(engine, initDocs + moreDocs, extraDocs); + engine.flush(true, true); + } + try (Engine engine = + openEngine(store, translogPath, new KeepAllIndexWriterFactory(), EngineConfig.RecoveryConfig.MOST_RECENT)) { + try (Searcher searcher = engine.acquireSearcher("test")) { + assertThat(searcher.reader().numDocs(), equalTo(maxRecoveringSeqNo + extraDocs)); + } + } + } + } + + void addDocs(Engine engine, int startIndex, int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(startIndex + i), null))); + if (frequently()) { + engine.flush(true, true); + } + if (rarely()) { + engine.rollTranslogGeneration(); + } + } + } + + private InternalEngine openEngine(Store store, Path translogPath, IndexWriterFactory indexWriterFactory, + EngineConfig.RecoveryConfig recoveryConfig) throws IOException { + final EngineConfig config = config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, recoveryConfig); + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, null, null, config); + if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + internalEngine.recoverFromTranslog(); + } + return internalEngine; + } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index fcc3c93fc37c3..dd1009d7c27b7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -53,7 +53,6 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.Scheduler.Cancellable; @@ -120,7 +119,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); + TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null, + EngineConfig.RecoveryConfig.MOST_RECENT); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 1a17e0dc6a06f..c1da18b3dfdc3 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -116,6 +116,7 @@ import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; @LuceneTestCase.SuppressFileSystems("ExtrasFS") public class TranslogTests extends ESTestCase { @@ -2534,4 +2535,30 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { } } } + + public void testFilterSnapshot() throws Exception { + final int operations = scaledRandomIntBetween(100, 4096); + long seqNo = 0; + for (int i = 0; i < operations; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + if (rarely()) { + translog.rollGeneration(); + } + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.size((int) seqNo)); + } + + final long maxSeqNo = randomLongBetween(1, seqNo); + try (Translog.Snapshot snapshot = translog.newSnapshot().filter(op -> op.seqNo() < maxSeqNo)) { + assertThat(snapshot, SnapshotMatchers.size((int) maxSeqNo)); + } + + final long singleOp = randomLongBetween(1, seqNo); + try (Translog.Snapshot snapshot = translog.newSnapshot().filter(op -> op.seqNo() == singleOp)) { + final Translog.Operation op = snapshot.next(); + assertThat(op.seqNo(), equalTo(singleOp)); + assertThat(snapshot.next(), nullValue()); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 5c2ef977b163e..8083d03c7bd39 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -162,7 +162,7 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort(), config.getTranslogRecoveryRunner()); + config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getRecoveryConfig()); } @Override @@ -376,6 +376,13 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort) { + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, indexSort, + EngineConfig.RecoveryConfig.MOST_RECENT); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener, Sort indexSort, + EngineConfig.RecoveryConfig recoveryConfig) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final EngineConfig.OpenMode openMode; @@ -401,7 +408,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler, recoveryConfig); return config; }