From 7602ad6dfa9630f874ebf83357ee7f8a09c97715 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 4 May 2017 21:02:14 -0400 Subject: [PATCH 1/2] Remove gap skipping when opening engine Today when opening the engine we skip gaps in the history, advancing the local checkpoint until it is equal to the maximum sequence number contained in the commit. This allows history to advance, but it leaves gaps. A previous change filled these gaps when recovering from store, but since we were skipping the gaps while opening the engine, this change had no effect. This commit removes the gap skipping when opening the engine allowing the gap filling to do its job. --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 29 +++------- .../index/shard/StoreRecovery.java | 2 +- .../index/engine/InternalEngineTests.java | 55 ++++++++++++------- 4 files changed, 46 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 755d9db68b027..31e0d1e422acf 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1421,7 +1421,7 @@ public interface Warmer { * @param primaryTerm the shards primary term this engine was created for * @return the number of no-ops added */ - public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException; + public abstract int fillSeqNoGaps(long primaryTerm) throws IOException; /** * Performs recovery from the transaction log. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5d494ef7ff705..6bd69a267932a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -177,15 +177,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); - // norelease - /* - * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means - * that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local - * checkpoint to the maximum sequence number in the commit (at the potential expense of correctness). - */ - while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) { - seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1); - } indexWriter = writer; translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; @@ -226,21 +217,17 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { } @Override - public int fillSequenceNumberHistory(long primaryTerm) throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + public int fillSeqNoGaps(long primaryTerm) throws IOException { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService.getLocalCheckpoint(); - final long maxSeqId = seqNoService.getMaxSeqNo(); + final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final long maxSeqNo = seqNoService().getMaxSeqNo(); int numNoOpsAdded = 0; - for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId; - // the local checkpoint might have been advanced so we are leap-frogging - // to the next seq ID we need to process and create a noop for - seqNo = seqNoService.getLocalCheckpoint()+1) { - final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history"); - innerNoOp(noOp); + for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqNo; seqNo = seqNoService().getLocalCheckpoint() + 1) { + innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps")); numNoOpsAdded++; - assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:" - + seqNoService.getLocalCheckpoint(); + assert seqNo <= seqNoService().getLocalCheckpoint() + : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]"; } return numNoOpsAdded; diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 5d5e17c19291b..b2e9416564059 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -365,7 +365,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe } indexShard.performTranslogRecovery(indexShouldExists); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; - indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm()); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f1b981e14d220..2a611adb7635f 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -114,7 +114,6 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -3580,6 +3579,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro try (Engine recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + recoveringEngine.recoverFromTranslog(); + recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); } } @@ -3618,6 +3619,8 @@ public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws I try (Engine recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + recoveringEngine.recoverFromTranslog(); + recoveringEngine.fillSeqNoGaps(1); assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1))); } } @@ -3719,21 +3722,35 @@ public long generateSeqNo() { throw new UnsupportedOperationException(); } }; - noOpEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); + noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + @Override + public SequenceNumbersService seqNoService() { + return seqNoService; + } + }; + noOpEngine.recoverFromTranslog(); final long primaryTerm = randomNonNegativeLong(); + final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm); final String reason = randomAlphaOfLength(16); noOpEngine.noOp( - new Engine.NoOp( - maxSeqNo + 1, - primaryTerm, - randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), - System.nanoTime(), - reason)); + new Engine.NoOp( + maxSeqNo + 1, + primaryTerm, + randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), + System.nanoTime(), + reason)); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1)); - final Translog.Operation op = noOpEngine.getTranslog().newSnapshot().next(); - assertThat(op, instanceOf(Translog.NoOp.class)); - final Translog.NoOp noOp = (Translog.NoOp) op; + assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled)); + // skip to the op that we added to the translog + Translog.Operation op; + Translog.Operation last = null; + final Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot(); + while ((op = snapshot.next()) != null) { + last = op; + } + assertNotNull(last); + assertThat(last, instanceOf(Translog.NoOp.class)); + final Translog.NoOp noOp = (Translog.NoOp) last; assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1))); assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); assertThat(noOp.reason(), equalTo(reason)); @@ -3846,7 +3863,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { for (int i = 0; i < docs; i++) { final String docId = Integer.toString(i); final ParsedDocument doc = - testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); + testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); Engine.Index primaryResponse = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(primaryResponse); if (randomBoolean()) { @@ -3864,8 +3881,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { boolean flushed = false; Engine recoveringEngine = null; try { - assertEquals(docs-1, engine.seqNoService().getMaxSeqNo()); - assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint()); + assertEquals(docs - 1, engine.seqNoService().getMaxSeqNo()); + assertEquals(docs - 1, engine.seqNoService().getLocalCheckpoint()); assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); @@ -3873,13 +3890,13 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); - assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); + assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); // now snapshot the tlog and ensure the primary term is updated Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); - assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations()); + assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations()); Translog.Operation operation; - while((operation = snapshot.next()) != null) { + while ((operation = snapshot.next()) != null) { if (operation.opType() == Translog.Operation.Type.NO_OP) { assertEquals(2, operation.primaryTerm()); } else { @@ -3905,7 +3922,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); - assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3)); + assertEquals(0, recoveringEngine.fillSeqNoGaps(3)); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); } finally { From 61a209339315dfdd3d40d03f0e2127107d7911c1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 8 May 2017 06:37:35 -0400 Subject: [PATCH 2/2] Revert comment removal --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6bd69a267932a..9bfbb1467a20f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -223,7 +223,10 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { final long localCheckpoint = seqNoService().getLocalCheckpoint(); final long maxSeqNo = seqNoService().getMaxSeqNo(); int numNoOpsAdded = 0; - for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqNo; seqNo = seqNoService().getLocalCheckpoint() + 1) { + for ( + long seqNo = localCheckpoint + 1; + seqNo <= maxSeqNo; + seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps")); numNoOpsAdded++; assert seqNo <= seqNoService().getLocalCheckpoint()