From 64b8fdde1e6dbe83ab96debc53ee2bb9fc0a9be6 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 20 Mar 2019 11:49:39 +0100 Subject: [PATCH] Cascading primary failure lead to MSU too low If a replica were first reset due to one primary failover and then promoted (before resync completes), its MSU would not include changes since global checkpoint, leading to errors during translog replay. Fixed by re-initializing MSU before restoring local history. --- .../elasticsearch/index/engine/Engine.java | 6 +-- .../index/engine/InternalEngine.java | 4 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/engine/InternalEngineTests.java | 50 +++++++++---------- .../index/engine/ReadOnlyEngineTests.java | 4 +- .../index/shard/IndexShardTests.java | 30 +++++++---- .../index/shard/RefreshListenersTests.java | 2 +- .../index/engine/EngineTestCase.java | 2 +- .../index/engine/FollowingEngineTests.java | 4 +- 10 files changed, 58 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 55b1ce6e6992d..edea3952ce4c5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1961,7 +1961,7 @@ public interface TranslogRecoveryRunner { * Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates * in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed. * - * @see #initializeMaxSeqNoOfUpdatesOrDeletes() + * @see #reinitializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) */ public final long getMaxSeqNoOfUpdatesOrDeletes() { @@ -1969,10 +1969,10 @@ public final long getMaxSeqNoOfUpdatesOrDeletes() { } /** - * A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the + * A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the * max_seq_no from Lucene index and translog before replaying the local translog in its local recovery. */ - public abstract void initializeMaxSeqNoOfUpdatesOrDeletes(); + public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes(); /** * A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7bf6f5ce3f180..ace5495a26a96 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2741,9 +2741,7 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a } @Override - public void initializeMaxSeqNoOfUpdatesOrDeletes() { - assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : - "max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]"; + public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()); advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 14336e66ca4f4..cae83927fdbc7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -455,7 +455,7 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } @Override - public void initializeMaxSeqNoOfUpdatesOrDeletes() { + public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo()); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 99dab5a0f3918..1733f54e4c103 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -536,6 +536,8 @@ public void updateShardState(final ShardRouting newRouting, assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); } + // in case we previously reset engine, we need to forward MSU before replaying translog. + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) -> runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between @@ -1394,7 +1396,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; innerOpenEngineAndTranslog(); final Engine engine = getEngine(); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index cfdf820ceb59a..c3f9f3c20a49e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -677,7 +677,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { trimUnsafeCommits(engine.config()); engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); @@ -695,7 +695,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); @@ -728,7 +728,7 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { } trimUnsafeCommits(engine.config()); try (Engine recoveringEngine = new InternalEngine(engine.config())) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -765,7 +765,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { @@ -800,7 +800,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { initialEngine.close(); trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -836,7 +836,7 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { } trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); @@ -844,7 +844,7 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); @@ -1261,7 +1261,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { } trimUnsafeCommits(config); engine = new InternalEngine(config); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1282,7 +1282,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { engine.close(); trimUnsafeCommits(config); engine = new InternalEngine(config); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); @@ -2381,7 +2381,7 @@ public void testSeqNoAndCheckpoints() throws IOException { trimUnsafeCommits(initialEngine.engineConfig); try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -2737,7 +2737,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2756,7 +2756,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); @@ -2771,7 +2771,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", @@ -2879,7 +2879,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } } }) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); @@ -2892,7 +2892,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2963,7 +2963,7 @@ public void testTranslogReplay() throws IOException { trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); @@ -3726,7 +3726,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), @@ -4094,7 +4094,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro } trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -4207,7 +4207,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); } }; - noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; @@ -4444,7 +4444,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { } trimUnsafeCommits(engineConfig); try (InternalEngine engine = new InternalEngine(engineConfig)) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); @@ -4492,7 +4492,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4529,7 +4529,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4724,7 +4724,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -5500,7 +5500,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { Set liveDocIds = new HashSet<>(); engine = new InternalEngine(engine.config()); assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); int numOps = between(1, 500); for (int i = 0; i < numOps; i++) { long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); @@ -5571,7 +5571,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); } - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, true), equalTo(docs)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 0ba33fe668ee3..e19bdc42b0156 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -96,7 +96,7 @@ public void testReadOnlyEngine() throws Exception { // Close and reopen the main engine InternalEngineTests.trimUnsafeCommits(config); try (InternalEngine recoveringEngine = new InternalEngine(config)) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); @@ -235,7 +235,7 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException { } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); assertThat(translogHandler.appliedOperations(), equalTo(0L)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8e25f85ce5c94..a78b6517ba7b6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -546,7 +546,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false); final int maxSeqNo = result.maxSeqNo; @@ -1093,7 +1093,7 @@ public void testGlobalCheckpointSync() throws IOException { public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); - indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), true); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); @@ -1145,9 +1145,9 @@ public void onFailure(Exception e) { assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); if (shouldRollback) { - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max( - Arrays.asList(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica)) - )); + // we conservatively roll MSU forward to maxSeqNo during restoreLocalHistory, ideally it should become just + // currentMaxSeqNoOfUpdates + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); } else { assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes))); } @@ -1159,7 +1159,9 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); + // todo: all tests should run with allowUpdates=true, but this specific test sometimes fails during lucene commit when updates are + // added (seed = F37E9647ABE5928) + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false); final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); @@ -1202,7 +1204,7 @@ public void onFailure(final Exception e) { } assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates)); // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances - final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()), false); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); closeShard(indexShard, false); } @@ -3137,19 +3139,25 @@ class Result { * @param indexShard the shard * @param operations the number of operations * @param offset the starting sequence number + * @param allowUpdates whether updates should be added. * @return a pair of the maximum sequence number and whether or not a gap was introduced * @throws IOException if an I/O exception occurs while indexing on the shard */ private Result indexOnReplicaWithGaps( final IndexShard indexShard, final int operations, - final int offset) throws IOException { + final int offset, + boolean allowUpdates) throws IOException { int localCheckpoint = offset; int max = offset; boolean gap = false; + Set ids = new HashSet<>(); for (int i = offset + 1; i < operations; i++) { if (!rarely() || i == operations - 1) { // last operation can't be a gap as it's not a gap anymore - final String id = Integer.toString(i); + final String id = ids.isEmpty() || randomBoolean() ? Integer.toString(i) : randomFrom(ids); + if (allowUpdates && ids.add(id) == false) { // this is an update + indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(i); + } SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", id, new BytesArray("{}"), XContentType.JSON); indexShard.applyIndexOperationOnReplica(i, 1, @@ -3604,7 +3612,7 @@ public void testSupplyTombstoneDoc() throws Exception { public void testResetEngine() throws Exception { IndexShard shard = newStartedShard(false); - indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); + indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()), false); final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()); shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() @@ -3644,7 +3652,7 @@ public void testResetEngine() throws Exception { public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception { final IndexShard replica = newStartedShard(false); - indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); + indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()), false); final int nbTermUpdates = randomIntBetween(1, 5); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index c80b3b5074921..4ad95c43b7077 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -148,7 +148,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index a483e79467b98..2c620a585aa71 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -514,7 +514,7 @@ protected InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFa } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + internalEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index b8d44f76026f2..dfac5ef2654b8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -289,7 +289,7 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - followingEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + followingEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; } @@ -495,7 +495,7 @@ private void runFollowTest(CheckedBiConsumer