diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 755d9db68b027..31e0d1e422acf 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1421,7 +1421,7 @@ public interface Warmer { * @param primaryTerm the shards primary term this engine was created for * @return the number of no-ops added */ - public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException; + public abstract int fillSeqNoGaps(long primaryTerm) throws IOException; /** * Performs recovery from the transaction log. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5d494ef7ff705..9bfbb1467a20f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -177,15 +177,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); - // norelease - /* - * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means - * that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local - * checkpoint to the maximum sequence number in the commit (at the potential expense of correctness). - */ - while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) { - seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1); - } indexWriter = writer; translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; @@ -226,21 +217,20 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { } @Override - public int fillSequenceNumberHistory(long primaryTerm) throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + public int fillSeqNoGaps(long primaryTerm) throws IOException { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService.getLocalCheckpoint(); - final long maxSeqId = seqNoService.getMaxSeqNo(); + final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final long maxSeqNo = seqNoService().getMaxSeqNo(); int numNoOpsAdded = 0; - for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId; - // the local checkpoint might have been advanced so we are leap-frogging - // to the next seq ID we need to process and create a noop for - seqNo = seqNoService.getLocalCheckpoint()+1) { - final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history"); - innerNoOp(noOp); + for ( + long seqNo = localCheckpoint + 1; + seqNo <= maxSeqNo; + seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { + innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps")); numNoOpsAdded++; - assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:" - + seqNoService.getLocalCheckpoint(); + assert seqNo <= seqNoService().getLocalCheckpoint() + : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]"; } return numNoOpsAdded; diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 5d5e17c19291b..b2e9416564059 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -365,7 +365,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe } indexShard.performTranslogRecovery(indexShouldExists); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; - indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm()); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f1b981e14d220..2a611adb7635f 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -114,7 +114,6 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -3580,6 +3579,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro try (Engine recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + recoveringEngine.recoverFromTranslog(); + recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); } } @@ -3618,6 +3619,8 @@ public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws I try (Engine recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + recoveringEngine.recoverFromTranslog(); + recoveringEngine.fillSeqNoGaps(1); assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1))); } } @@ -3719,21 +3722,35 @@ public long generateSeqNo() { throw new UnsupportedOperationException(); } }; - noOpEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); + noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + @Override + public SequenceNumbersService seqNoService() { + return seqNoService; + } + }; + noOpEngine.recoverFromTranslog(); final long primaryTerm = randomNonNegativeLong(); + final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm); final String reason = randomAlphaOfLength(16); noOpEngine.noOp( - new Engine.NoOp( - maxSeqNo + 1, - primaryTerm, - randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), - System.nanoTime(), - reason)); + new Engine.NoOp( + maxSeqNo + 1, + primaryTerm, + randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), + System.nanoTime(), + reason)); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1)); - final Translog.Operation op = noOpEngine.getTranslog().newSnapshot().next(); - assertThat(op, instanceOf(Translog.NoOp.class)); - final Translog.NoOp noOp = (Translog.NoOp) op; + assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled)); + // skip to the op that we added to the translog + Translog.Operation op; + Translog.Operation last = null; + final Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot(); + while ((op = snapshot.next()) != null) { + last = op; + } + assertNotNull(last); + assertThat(last, instanceOf(Translog.NoOp.class)); + final Translog.NoOp noOp = (Translog.NoOp) last; assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1))); assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); assertThat(noOp.reason(), equalTo(reason)); @@ -3846,7 +3863,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { for (int i = 0; i < docs; i++) { final String docId = Integer.toString(i); final ParsedDocument doc = - testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); + testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); Engine.Index primaryResponse = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(primaryResponse); if (randomBoolean()) { @@ -3864,8 +3881,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { boolean flushed = false; Engine recoveringEngine = null; try { - assertEquals(docs-1, engine.seqNoService().getMaxSeqNo()); - assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint()); + assertEquals(docs - 1, engine.seqNoService().getMaxSeqNo()); + assertEquals(docs - 1, engine.seqNoService().getLocalCheckpoint()); assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); @@ -3873,13 +3890,13 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); - assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); + assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); // now snapshot the tlog and ensure the primary term is updated Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); - assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations()); + assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations()); Translog.Operation operation; - while((operation = snapshot.next()) != null) { + while ((operation = snapshot.next()) != null) { if (operation.opType() == Translog.Operation.Type.NO_OP) { assertEquals(2, operation.primaryTerm()); } else { @@ -3905,7 +3922,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); - assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3)); + assertEquals(0, recoveringEngine.fillSeqNoGaps(3)); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); } finally {