diff --git a/build.gradle b/build.gradle index cfc8401a934e0..7b1e517a8586b 100644 --- a/build.gradle +++ b/build.gradle @@ -186,7 +186,7 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ allprojects { - ext.bwc_tests_enabled = true + ext.bwc_tests_enabled = false } task verifyBwcTestsEnabled { diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 9b304de6077fc..a755044c11334 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -95,6 +95,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; + public static final String HISTORY_UUID_KEY = "history_uuid"; protected final ShardId shardId; protected final String allocationId; @@ -183,6 +184,9 @@ public MergeStats getMergeStats() { return new MergeStats(); } + /** returns the history uuid for the engine */ + public abstract String getHistoryUUID(); + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e1bf949f50eab..d7cf3e16069e1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -48,6 +48,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; @@ -142,6 +143,8 @@ public class InternalEngine extends Engine { private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); + @Nullable + private final String historyUUID; public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); @@ -174,15 +177,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { switch (openMode) { case OPEN_INDEX_AND_TRANSLOG: writer = createWriter(false); + String existingHistoryUUID = loadHistoryUUIDFromCommit(writer); + if (existingHistoryUUID == null) { + historyUUID = UUIDs.randomBase64UUID(); + } else { + historyUUID = existingHistoryUUID; + } final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); seqNoStats = store.loadSeqNoStats(globalCheckpoint); break; case OPEN_INDEX_CREATE_TRANSLOG: writer = createWriter(false); + historyUUID = loadHistoryUUIDFromCommit(writer); seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); break; case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); + historyUUID = UUIDs.randomBase64UUID(); seqNoStats = new SeqNoStats( SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, @@ -342,6 +353,12 @@ private void recoverFromTranslogInternal() throws IOException { flush(true, true); } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); + refreshLastCommittedSegmentInfos(); + } else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) { + assert historyUUID != null; + // put the history uuid into the index + commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); + refreshLastCommittedSegmentInfos(); } // clean up what's not needed translog.trimUnreferencedReaders(); @@ -382,6 +399,11 @@ public Translog getTranslog() { return translog; } + @Override + public String getHistoryUUID() { + return historyUUID; + } + /** * Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current * translog id into lucene and returns null. @@ -401,6 +423,19 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException } } + /** + * Reads the current stored history ID from the IW commit data. If the id is not found, returns null. + */ + @Nullable + private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException { + String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); + if (uuid == null) { + assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : + "index was created after 6_0_0_rc1 but has no history uuid"; + } + return uuid; + } + private SearcherManager createSearcherManager() throws EngineException { boolean success = false; SearcherManager searcherManager = null; @@ -1312,30 +1347,8 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); } - /* - * we have to inc-ref the store here since if the engine is closed by a tragic event - * we don't acquire the write lock and wait until we have exclusive access. This might also - * dec the store reference which can essentially close the store and unless we can inc the reference - * we can't use it. - */ - store.incRef(); - try { - // reread the last committed segment infos - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - } catch (Exception e) { - if (isClosed.get() == false) { - try { - logger.warn("failed to read latest segment infos on flush", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - if (Lucene.isCorruptionException(e)) { - throw new FlushFailedEngineException(shardId, e); - } - } - } finally { - store.decRef(); - } + refreshLastCommittedSegmentInfos(); + } newCommitId = lastCommittedSegmentInfos.getId(); } catch (FlushFailedEngineException ex) { @@ -1353,6 +1366,33 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti return new CommitId(newCommitId); } + private void refreshLastCommittedSegmentInfos() { + /* + * we have to inc-ref the store here since if the engine is closed by a tragic event + * we don't acquire the write lock and wait until we have exclusive access. This might also + * dec the store reference which can essentially close the store and unless we can inc the reference + * we can't use it. + */ + store.incRef(); + try { + // reread the last committed segment infos + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + } catch (Exception e) { + if (isClosed.get() == false) { + try { + logger.warn("failed to read latest segment infos on flush", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + if (Lucene.isCorruptionException(e)) { + throw new FlushFailedEngineException(shardId, e); + } + } + } finally { + store.decRef(); + } + } + @Override public void rollTranslogGeneration() throws EngineException { try (ReleasableLock ignored = readLock.acquire()) { @@ -1874,7 +1914,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(5); + final Map commitData = new HashMap<>(6); commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); @@ -1883,6 +1923,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + if (historyUUID != null) { + commitData.put(HISTORY_UUID_KEY, historyUUID); + } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); @@ -1992,7 +2035,7 @@ public boolean isRecovering() { * Gets the commit data from {@link IndexWriter} as a map. */ private static Map commitDataAsMap(final IndexWriter indexWriter) { - Map commitData = new HashMap<>(5); + Map commitData = new HashMap<>(6); for (Map.Entry entry : indexWriter.getLiveCommitData()) { commitData.put(entry.getKey(), entry.getValue()); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 34ed1b4ce9e35..dd47be5a141dd 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1585,6 +1585,10 @@ public Translog getTranslog() { return getEngine().getTranslog(); } + public String getHistoryUUID() { + return getEngine().getHistoryUUID(); + } + public IndexEventListener getIndexEventListener() { return indexEventListener; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 078e8b06d6e20..63b7bc0805581 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -35,10 +35,12 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; @@ -162,10 +164,11 @@ void addIndices( * document-level semantics. */ writer.setLiveCommitData(() -> { - final HashMap liveCommitData = new HashMap<>(2); + final HashMap liveCommitData = new HashMap<>(4); liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp)); + liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); return liveCommitData.entrySet().iterator(); }); writer.commit(); diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 6700a005c9c96..fa992e12ef220 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -79,6 +79,7 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.EOFException; @@ -1027,6 +1028,20 @@ public Map getCommitUserData() { return commitUserData; } + /** + * returns the history uuid the store points at, or null if not existant. + */ + public String getHistoryUUID() { + return commitUserData.get(Engine.HISTORY_UUID_KEY); + } + + /** + * returns the translog uuid the store points at + */ + public String getTranslogUUID() { + return commitUserData.get(Translog.TRANSLOG_UUID_KEY); + } + /** * Returns true iff this metadata contains the given file. */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index 325f840bd7c30..d9b77f841ed09 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -25,6 +25,8 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.Lock; @@ -37,9 +39,11 @@ import org.elasticsearch.cli.EnvironmentAwareCommand; import org.elasticsearch.cli.Terminal; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; @@ -51,6 +55,7 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -101,64 +106,82 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) { throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory"); } - - // Hold the lock open for the duration of the tool running - try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); - Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - Set translogFiles; - try { - terminal.println("Checking existing translog files"); - translogFiles = filesInDirectory(translogPath); - } catch (IOException e) { - terminal.println("encountered IOException while listing directory, aborting..."); - throw new ElasticsearchException("failed to find existing translog files", e); - } - - // Warn about ES being stopped and files being deleted - warnAboutDeletingFiles(terminal, translogFiles, batch); - - List commits; - try { - terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]"); - commits = DirectoryReader.listCommits(dir); - } catch (IndexNotFoundException infe) { - throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe); - } - - // Retrieve the generation and UUID from the existing data - Map commitData = commits.get(commits.size() - 1).getUserData(); - String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); - String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); - if (translogGeneration == null || translogUUID == null) { - throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", + try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE)) { + final String historyUUID = UUIDs.randomBase64UUID(); + final Map commitData; + // Hold the lock open for the duration of the tool running + try (Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + Set translogFiles; + try { + terminal.println("Checking existing translog files"); + translogFiles = filesInDirectory(translogPath); + } catch (IOException e) { + terminal.println("encountered IOException while listing directory, aborting..."); + throw new ElasticsearchException("failed to find existing translog files", e); + } + + // Warn about ES being stopped and files being deleted + warnAboutDeletingFiles(terminal, translogFiles, batch); + + List commits; + try { + terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]"); + commits = DirectoryReader.listCommits(dir); + } catch (IndexNotFoundException infe) { + throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe); + } + + // Retrieve the generation and UUID from the existing data + commitData = commits.get(commits.size() - 1).getUserData(); + String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); + String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); + if (translogGeneration == null || translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", translogGeneration, translogUUID); + } + terminal.println("Translog Generation: " + translogGeneration); + terminal.println("Translog UUID : " + translogUUID); + terminal.println("History UUID : " + historyUUID); + + Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); + Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); + Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + + translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + + translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + + // Write empty checkpoint and translog to empty files + long gen = Long.parseLong(translogGeneration); + int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); + writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen); + + terminal.println("Removing existing translog files"); + IOUtils.rm(translogFiles.toArray(new Path[]{})); + + terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]"); + Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE); + terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]"); + Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE); + + // Fsync the translog directory after rename + IOUtils.fsync(translogPath, true); } - terminal.println("Translog Generation: " + translogGeneration); - terminal.println("Translog UUID : " + translogUUID); - - Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); - Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); - Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - - // Write empty checkpoint and translog to empty files - long gen = Long.parseLong(translogGeneration); - int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); - writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen); - - terminal.println("Removing existing translog files"); - IOUtils.rm(translogFiles.toArray(new Path[]{})); - - terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]"); - Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE); - terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]"); - Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE); - - // Fsync the translog directory after rename - IOUtils.fsync(translogPath, true); + terminal.println("Marking index with the new history uuid"); + // commit the new histroy id + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + Map newCommitData = new HashMap<>(commitData); + newCommitData.put(Engine.HISTORY_UUID_KEY, historyUUID); + writer.setLiveCommitData(newCommitData.entrySet()); + writer.commit(); + } } catch (LockObtainFailedException lofe) { throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?"); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 73ab31975684c..70e1ba06b07e3 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -31,6 +31,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -147,8 +148,8 @@ public RecoveryResponse recoverToTarget() throws IOException { final Translog translog = shard.getTranslog(); final long startingSeqNo; - boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTranslogReadyForSequenceNumberBasedRecovery(); + final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && + isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); @@ -198,6 +199,13 @@ public RecoveryResponse recoverToTarget() throws IOException { return response; } + private boolean isTargetSameHistory() { + final String targetHistoryUUID = request.metadataSnapshot().getHistoryUUID(); + assert targetHistoryUUID != null || shard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : + "incoming target history N/A but index was created after or on 6.0.0-rc1"; + return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID()); + } + private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { cancellableThreads.execute(() -> { final PlainActionFuture onAcquired = new PlainActionFuture<>(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 57ea19ff298d9..cfdaddabdf15b 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -75,6 +75,8 @@ public StartRecoveryRequest(final ShardId shardId, this.metadataSnapshot = metadataSnapshot; this.primaryRelocation = primaryRelocation; this.startingSeqNo = startingSeqNo; + assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null : + "starting seq no is set but not history uuid"; } public long recoveryId() { diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 69e1631d7db54..0ea47392d5c21 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2810,6 +2810,44 @@ public void testRecoverFromForeignTranslog() throws IOException { assertVisibleCount(engine, numDocs, false); } + public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException { + final int numDocs = randomIntBetween(0, 3); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.IndexResult index = engine.index(firstIndexRequest); + assertThat(index.getVersion(), equalTo(1L)); + } + assertVisibleCount(engine, numDocs); + engine.close(); + + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { + Map newCommitData = new HashMap<>(); + for (Map.Entry entry: writer.getLiveCommitData()) { + if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { + newCommitData.put(entry.getKey(), entry.getValue()); + } + } + writer.setLiveCommitData(newCommitData.entrySet()); + writer.commit(); + } + + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) + .build()); + engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null); + assertVisibleCount(engine, numDocs, false); + assertThat(engine.getHistoryUUID(), notNullValue()); + } + public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { AtomicReference exception = new AtomicReference<>(); String operation = randomFrom("optimize", "refresh", "flush"); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c38d3434c3b8f..93ebb319063f1 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -315,7 +315,7 @@ private synchronized IndexShardRoutingTable routingTable(Function, TranslogWriter> createReadersAndWriter(final TranslogWriter writer = null; List readers = new ArrayList<>(); final int numberOfReaders = randomIntBetween(0, 10); + final String translogUUID = UUIDs.randomBase64UUID(random()); for (long gen = 1; gen <= numberOfReaders + 1; gen++) { if (writer != null) { final TranslogReader reader = Mockito.spy(writer.closeIntoReader()); Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime(); readers.add(reader); } - writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen, + writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L ); writer = Mockito.spy(writer); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java index 46761698610a5..d57373ebfe349 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -89,7 +89,7 @@ public TranslogReader openReader(final Path path, final long id) throws IOExcept final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final Checkpoint checkpoint = - new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id); + new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id); return TranslogReader.open(channel, path, checkpoint, null); } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index 60434d95e6209..c2b394b219a20 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -77,7 +77,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; @@ -144,16 +143,12 @@ public void testCorruptTranslogTruncation() throws Exception { } } - final boolean expectSeqNoRecovery; if (randomBoolean() && numDocsToTruncate > 0) { // flush the replica, so it will have more docs than what the primary will have Index index = resolveIndex("test"); IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(new ShardId(index, 0)); replica.flush(new FlushRequest()); - expectSeqNoRecovery = false; - logger.info("--> ops based recovery disabled by flushing replica"); - } else { - expectSeqNoRecovery = true; + logger.info("--> performed extra flushing on replica"); } // shut down the replica node to be tested later @@ -219,8 +214,7 @@ public void testCorruptTranslogTruncation() throws Exception { final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); - assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), - expectSeqNoRecovery ? equalTo(0) : greaterThan(0)); + assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); } public void testCorruptTranslogTruncationOfReplica() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index b69fa1321ed37..524795bfa2480 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -21,8 +21,10 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.transport.TransportService; @@ -39,7 +41,8 @@ public void testDuplicateRecoveries() throws IOException { mock(TransportService.class), mock(IndicesService.class), new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), - getFakeDiscoNode("source"), getFakeDiscoNode("target"), null, randomBoolean(), randomLong(), randomLong()); + getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), + SequenceNumbers.UNASSIGNED_SEQ_NO); RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary)); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index f876f6bf80dbc..835d16117ad60 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -73,11 +73,11 @@ Path translogLocation() { translogLocation.set(replica.getTranslog().location()); + final Translog translog = replica.getTranslog(); + final String translogUUID = translog.getTranslogUUID(); assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - final Translog translog = replica.getTranslog(); - translogLocation.set( - writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo - 1)); + translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1)); // commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); @@ -89,8 +89,7 @@ Path translogLocation() { // commit is not good, global checkpoint is below max assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - translogLocation.set( - writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo)); + translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo)); // commit is good, global checkpoint is above max assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1)); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 9f280839e8638..993cc84506498 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.FileSystemUtils; @@ -96,17 +97,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { public void testSendFiles() throws Throwable { Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). - put("indices.recovery.concurrent_small_file_streams", 1).build(); + put("indices.recovery.concurrent_small_file_streams", 1).build(); final RecoverySettings recoverySettings = new RecoverySettings(settings, service); - final StartRecoveryRequest request = new StartRecoveryRequest( - shardId, - null, - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - null, - randomBoolean(), - randomNonNegativeLong(), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + final StartRecoveryRequest request = getStartRecoveryRequest(); Store store = newStore(createTempDir()); RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY); @@ -151,19 +144,26 @@ public void close() throws IOException { IOUtils.close(reader, store, targetStore); } - public void testSendSnapshotSendsOps() throws IOException { - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); - final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); - final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); - final StartRecoveryRequest request = new StartRecoveryRequest( + public StartRecoveryRequest getStartRecoveryRequest() throws IOException { + Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : + new Store.MetadataSnapshot(Collections.emptyMap(), + Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100)); + return new StartRecoveryRequest( shardId, null, new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - null, + metadataSnapshot, randomBoolean(), randomNonNegativeLong(), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + } + + public void testSendSnapshotSendsOps() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); + final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class); @@ -181,6 +181,7 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); } operations.add(null); + final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { @Override public void close() { @@ -226,18 +227,9 @@ private Engine.Index getIndex(final String id) { public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). - put("indices.recovery.concurrent_small_file_streams", 1).build(); + put("indices.recovery.concurrent_small_file_streams", 1).build(); final RecoverySettings recoverySettings = new RecoverySettings(settings, service); - final StartRecoveryRequest request = - new StartRecoveryRequest( - shardId, - null, - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - null, - randomBoolean(), - randomNonNegativeLong(), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : 0L); + final StartRecoveryRequest request = getStartRecoveryRequest(); Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); @@ -268,8 +260,8 @@ protected void failEngine(IOException cause) { } CorruptionUtils.corruptFile(random(), FileSystemUtils.files(tempDir, (p) -> - (p.getFileName().toString().equals("write.lock") || - p.getFileName().toString().startsWith("extra")) == false)); + (p.getFileName().toString().equals("write.lock") || + p.getFileName().toString().startsWith("extra")) == false)); Store targetStore = newStore(createTempDir(), false); try { handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { @@ -296,18 +288,9 @@ public void close() throws IOException { public void testHandleExceptinoOnSendSendFiles() throws Throwable { Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). - put("indices.recovery.concurrent_small_file_streams", 1).build(); + put("indices.recovery.concurrent_small_file_streams", 1).build(); final RecoverySettings recoverySettings = new RecoverySettings(settings, service); - final StartRecoveryRequest request = - new StartRecoveryRequest( - shardId, - null, - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - null, - randomBoolean(), - randomNonNegativeLong(), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : 0L); + final StartRecoveryRequest request = getStartRecoveryRequest(); Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); @@ -363,17 +346,7 @@ protected void failEngine(IOException cause) { public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException { final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); - final boolean attemptSequenceNumberBasedRecovery = randomBoolean(); - final StartRecoveryRequest request = - new StartRecoveryRequest( - shardId, - null, - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - null, - false, - randomNonNegativeLong(), - attemptSequenceNumberBasedRecovery ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_SEQ_NO); + final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class)); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 48f0c2f839feb..e2314cff014bc 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,22 +19,35 @@ package org.elasticsearch.indices.recovery; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class RecoveryTests extends ESIndexLevelReplicationTestCase { @@ -54,7 +67,6 @@ public void testTranslogHistoryTransferred() throws Exception { } } - public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); @@ -132,4 +144,67 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps)); } } + + public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + // index some shared docs + final int flushedDocs = 10; + final int nonFlushedDocs = randomIntBetween(0, 10); + final int numDocs = flushedDocs + nonFlushedDocs; + shards.indexDocs(flushedDocs); + shards.flush(); + shards.indexDocs(nonFlushedDocs); + + IndexShard replica = shards.getReplicas().get(0); + final String translogUUID = replica.getTranslog().getTranslogUUID(); + final String historyUUID = replica.getHistoryUUID(); + Translog.TranslogGeneration translogGeneration = replica.getTranslog().getGeneration(); + shards.removeReplica(replica); + replica.close("test", false); + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + Map userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData()); + final String translogUUIDtoUse; + final long translogGenToUse; + final String historyUUIDtoUse = UUIDs.randomBase64UUID(random()); + if (randomBoolean()) { + // create a new translog + final TranslogConfig translogConfig = + new TranslogConfig(replica.shardId(), replica.shardPath().resolveTranslog(), replica.indexSettings(), + BigArrays.NON_RECYCLING_INSTANCE); + try (Translog translog = new Translog(translogConfig, null, createTranslogDeletionPolicy(), () -> flushedDocs)) { + translogUUIDtoUse = translog.getTranslogUUID(); + translogGenToUse = translog.currentFileGeneration(); + } + } else { + translogUUIDtoUse = translogGeneration.translogUUID; + translogGenToUse = translogGeneration.translogFileGeneration; + } + try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) { + userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse); + userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse); + userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse)); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + replica.store().close(); + IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + shards.recoverReplica(newReplica); + // file based recovery should be made + assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); + assertThat(newReplica.getTranslog().totalOperations(), equalTo(numDocs)); + + // history uuid was restored + assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); + assertThat(newReplica.commitStats().getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID)); + + shards.assertAllEqual(numDocs); + } + } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index b478243392e1b..14799687d232b 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -31,6 +32,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.util.Collections; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -41,6 +43,9 @@ public class StartRecoveryRequestTests extends ESTestCase { public void testSerialization() throws Exception { final Version targetNodeVersion = randomVersion(random()); + Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : + new Store.MetadataSnapshot(Collections.emptyMap(), + Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100)); final StartRecoveryRequest outRequest = new StartRecoveryRequest( new ShardId("test", "_na_", 0), UUIDs.randomBase64UUID(), @@ -49,7 +54,8 @@ public void testSerialization() throws Exception { Store.MetadataSnapshot.EMPTY, randomBoolean(), randomNonNegativeLong(), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index e9cac91d740cc..0c75fd011b418 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -96,6 +96,7 @@ which returns something similar to: "generation" : 2, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", + "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", "local_checkpoint" : "-1", "translog_generation" : "1", "max_seq_no" : "-1", @@ -117,6 +118,7 @@ which returns something similar to: -------------------------------------------------- // TESTRESPONSE[s/"id" : "3M3zkw2GHMo2Y4h4\/KFKCg=="/"id": $body.indices.twitter.shards.0.0.commit.id/] // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/] +// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/] // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/] // TESTRESPONSE[s/"1": \.\.\./"1": $body.indices.twitter.shards.1/] // TESTRESPONSE[s/"2": \.\.\./"2": $body.indices.twitter.shards.2/] diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 323ecd9ae9d22..c7e708418c92c 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.junit.Before; import java.io.IOException; @@ -52,6 +53,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; /** * Tests to run before and after a full cluster restart. This is run twice, @@ -761,6 +763,39 @@ public void testSnapshotRestore() throws IOException { } } + public void testHistoryUUIDIsAdded() throws Exception { + if (runningAgainstOldCluster) { + XContentBuilder mappingsAndSettings = jsonBuilder(); + mappingsAndSettings.startObject(); + { + mappingsAndSettings.startObject("settings"); + mappingsAndSettings.field("number_of_shards", 1); + mappingsAndSettings.field("number_of_replicas", 1); + mappingsAndSettings.endObject(); + } + mappingsAndSettings.endObject(); + client().performRequest("PUT", "/" + index, Collections.emptyMap(), + new StringEntity(mappingsAndSettings.string(), ContentType.APPLICATION_JSON)); + } else { + Response response = client().performRequest("GET", index + "/_stats", singletonMap("level", "shards")); + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); + String globalHistoryUUID = null; + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + logger.info("evaluating: {} , {}", ObjectPath.evaluate(shard, "routing"), ObjectPath.evaluate(shard, "commit")); + String historyUUID = ObjectPath.evaluate(shard, "commit.user_data.history_uuid"); + assertThat("no history uuid found on " + nodeId + " (primary: " + primary + ")", historyUUID, notNullValue()); + if (globalHistoryUUID == null) { + globalHistoryUUID = historyUUID; + } else { + assertThat("history uuid mismatch on " + nodeId + " (primary: " + primary + ")", historyUUID, + equalTo(globalHistoryUUID)); + } + } + } + } + private void checkSnapshot(String snapshotName, int count, Version tookOnVersion) throws IOException { // Check the snapshot metadata, especially the version String response = toStr(client().performRequest("GET", "/_snapshot/repo/" + snapshotName, listSnapshotVerboseParams()));