From 587ea79ba1095fc859e448671548ade02ffe6af2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Sep 2018 16:22:41 -0400 Subject: [PATCH 01/10] Track max seq_no of updates or deletes on primary This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize plan for indexing operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced inside an engine internally when processing an update or a delete operation. --- .../elasticsearch/index/engine/Engine.java | 25 ++++++++++ .../index/engine/InternalEngine.java | 8 +++ .../elasticsearch/index/shard/IndexShard.java | 26 ++++++++++ .../index/engine/InternalEngineTests.java | 50 ++++++++++++++++++- .../index/shard/IndexShardTests.java | 1 + .../index/shard/RefreshListenersTests.java | 1 + .../elasticsearch/indices/flush/FlushIT.java | 3 +- .../index/engine/EngineTestCase.java | 2 + .../index/shard/IndexShardTestCase.java | 2 + 9 files changed, 115 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index fc693113fee53..3d8cebc5a3d3f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -88,6 +88,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -126,6 +127,9 @@ public abstract class Engine implements Closeable { * inactive shards. */ protected volatile long lastWriteNanos = System.nanoTime(); + // The maximum sequence number of either update or delete operations have been processed by this engine. + // This value is started with an unassigned status (-2) and will be initialized from outside. + private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); protected Engine(EngineConfig engineConfig) { Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -1694,4 +1698,25 @@ public boolean isRecovering() { public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; } + + /** + * Returns the maximum sequence number of either update or delete operations have been processed + * in this engine or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. + *

+ * For a primary engine, this value is initialized once, then advanced internally when it processes + * an update or a delete operation. Whereas a replica engine never updates this value by itself but + * only inherits the latest value from its primary. In both cases, this value never goes backwards. + */ + public long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes.get(); + } + + /** + * Advances the max_seq_no_of_updates marker of this engine to at least the given sequence number. + * @see #getMaxSeqNoOfUpdatesOrDeletes() + */ + public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { + maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); + assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 52dd4d3fcd09e..081d8c26df25f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -922,11 +922,13 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); + assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); + advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); versionMap.enforceSafeAccess(); } else { plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); @@ -954,6 +956,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); + final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + if (toAppend == false) { + advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); + } } } return plan; @@ -1245,6 +1251,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) { protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); + assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); assert incrementVersionLookup(); @@ -1266,6 +1273,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE currentlyDeleted, generateSeqNoForOperation(delete), delete.versionType().updateVersion(currentVersion, delete.version())); + advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion); } return plan; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 168444a226750..7fe49a5c9910c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -516,6 +516,7 @@ public void updateShardState(final ShardRouting newRouting, */ engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -1324,6 +1325,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; innerOpenEngineAndTranslog(); getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); + advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); } /** @@ -1947,6 +1949,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex + advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); } } @@ -2720,4 +2723,27 @@ void resetEngineToGlobalCheckpoint() throws IOException { }); newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } + + /** + * Returns the maximum sequence number of either update operations (overwrite existing documents) or delete operations + * have been processed in this shard or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. + *

+ * The primary captures this value after executes a replication request, then transfers it to a replica before executing + * that replication request on a replica. + */ + public long getMaxSeqNoOfUpdatesOrDeletes() { + return getEngine().getMaxSeqNoOfUpdatesOrDeletes(); + } + + /** + * Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number. + *

+ * A primary calls this method only once to initialize this maker after being promoted or when it finishes its + * recovery or relocation. Whereas a replica calls this method before executing a replication request or before + * applying translog operations in peer-recovery. + */ + public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { + getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); + assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes(); + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8f9d90154f8f4..003cff2441319 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -679,6 +679,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); @@ -2678,6 +2679,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); globalCheckpoint.set(engine.getLocalCheckpoint()); @@ -3461,9 +3463,11 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { engine.index(appendOnlyPrimary(doc, true, timestamp1)); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } - try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) { + try (Store store = createStore(newFSDirectory(storeDir)); + InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); @@ -4353,7 +4357,7 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); - try (Engine engine = new InternalEngine(engineConfig) { + try (InternalEngine engine = new InternalEngine(engineConfig) { @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { // Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog @@ -4365,6 +4369,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); @@ -5033,6 +5038,47 @@ public void testAcquireSearcherOnClosingEngine() throws Exception { expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); } + public void testTrackMaxSeqNoOfUpdatesOrDeletes() throws Exception { + engine.close(); + Set liveDocIds = new HashSet<>(); + engine = new InternalEngine(engine.config()); + assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(-1L, 50L)); + int numOps = between(1, 500); + for (int i = 0; i < numOps; i++) { + long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); + ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null); + if (randomBoolean()) { + if (randomBoolean()) { + Engine.IndexResult result = engine.index(indexForDoc(doc)); + if (liveDocIds.add(doc.id()) == false) { + assertThat("update operations on primary must advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); + } else { + assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); + } + } else { + Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + liveDocIds.remove(doc.id()); + assertThat("delete operations on primary must advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); + } + } else { + long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); + long seqNo = randomLongBetween(maxSeqNo + 1, maxSeqNo + 10); + if (randomBoolean()) { + engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean())); + liveDocIds.add(doc.id()); + } else { + engine.delete(replicaDeleteForDoc(doc.id(), 1, seqNo, threadPool.relativeTimeInMillis())); + liveDocIds.remove(doc.id()); + } + assertThat("non-primary operations should not advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); + } + } + } + static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9a5df39a970a9..7119e76c1fa70 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -947,6 +947,7 @@ public void onFailure(Exception e) { resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); closeShard(indexShard, false); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 2492ab4cd8a08..804afa35211c1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -128,6 +128,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getSeqNoStats(-1).getMaxSeqNo()); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 4483ce0d60642..ea23ae6308e47 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -239,7 +239,8 @@ public void testUnallocatedShardsDoesNotHang() throws InterruptedException { private void indexDoc(Engine engine, String id) throws IOException { final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null); - final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), 1L, doc)); + final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc, + engine.getLocalCheckpoint() + 1, 1L, 1L, null, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false)); assertThat(indexResult.getFailure(), nullValue()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 86f7bd903cc3b..795babce6997b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -478,6 +478,8 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + // IndexShard initializes this value after replaying local translog. + internalEngine.advanceMaxSeqNoOfUpdatesOrDeletes(internalEngine.getLocalCheckpointTracker().getMaxSeqNo()); return internalEngine; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 78ce5bc500ce8..7f561f55872e7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -94,6 +94,7 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; /** @@ -444,6 +445,7 @@ protected IndexShard newStartedShard(CheckedFunction Date: Tue, 18 Sep 2018 22:16:22 -0400 Subject: [PATCH 02/10] make it final --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3d8cebc5a3d3f..e5699512321b0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1707,7 +1707,7 @@ public interface TranslogRecoveryRunner { * an update or a delete operation. Whereas a replica engine never updates this value by itself but * only inherits the latest value from its primary. In both cases, this value never goes backwards. */ - public long getMaxSeqNoOfUpdatesOrDeletes() { + public final long getMaxSeqNoOfUpdatesOrDeletes() { return maxSeqNoOfUpdatesOrDeletes.get(); } @@ -1715,7 +1715,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * Advances the max_seq_no_of_updates marker of this engine to at least the given sequence number. * @see #getMaxSeqNoOfUpdatesOrDeletes() */ - public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { + public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo; } From d84de4ac184924f3581271f0ace75e579a7bafc0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 19 Sep 2018 13:48:51 -0400 Subject: [PATCH 03/10] =?UTF-8?q?boaz=E2=80=99s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../elasticsearch/index/engine/Engine.java | 12 +++++-- .../index/engine/InternalEngine.java | 9 +++-- .../elasticsearch/index/shard/IndexShard.java | 29 ++++++++++----- .../index/engine/InternalEngineTests.java | 35 ++++++------------- 4 files changed, 45 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e5699512321b0..ff6952272732c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -127,8 +127,13 @@ public abstract class Engine implements Closeable { * inactive shards. */ protected volatile long lastWriteNanos = System.nanoTime(); - // The maximum sequence number of either update or delete operations have been processed by this engine. - // This value is started with an unassigned status (-2) and will be initialized from outside. + + /* + * This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine. + * An update operation is just an index request which overwrites existing documents with the same docId in the Lucene index. + * This marker is started with an unassigned status(-2), then will be initialized from outside (via advanceMaxSeqNoOfUpdatesOrDeletes). + * The optimization using seq_no will be disabled (regardless of other conditions) if this marker is still uninitialized (-2). + */ private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); protected Engine(EngineConfig engineConfig) { @@ -1702,6 +1707,9 @@ public interface TranslogRecoveryRunner { /** * Returns the maximum sequence number of either update or delete operations have been processed * in this engine or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. + * An index request is considered as an update operation if it overwritten the existing documents + * in Lucene index with the same document id. + * *

* For a primary engine, this value is initialized once, then advanced internally when it processes * an update or a delete operation. Whereas a replica engine never updates this value by itself but diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 081d8c26df25f..0510a008404e5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -928,7 +928,6 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); - advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); versionMap.enforceSafeAccess(); } else { plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); @@ -956,12 +955,12 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); - final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; - if (toAppend == false) { - advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); - } } } + final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + if (toAppend == false) { + advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); + } return plan; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7fe49a5c9910c..d8e3379daf940 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -516,7 +516,12 @@ public void updateShardState(final ShardRouting newRouting, */ engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + // TODO: Enable this assertion after we replicate max_seq_no_updates during replication + // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : + // indexSettings.getIndexVersionCreated(); + advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + } replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -1949,7 +1954,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex - advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + // If the old primary was on an old version, this primary (was replica before) + // does not have max_of_updates yet. Thus we need to bootstrap it manually. + if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + // TODO: Enable this assertion after we replicate max_seq_no_updates during replication + // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated(); + advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + } } } @@ -2725,11 +2736,12 @@ void resetEngineToGlobalCheckpoint() throws IOException { } /** - * Returns the maximum sequence number of either update operations (overwrite existing documents) or delete operations - * have been processed in this shard or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. + * Returns the maximum sequence number of either update or delete operations have been processed in this shard + * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered + * as an update operation if it overwritten the existing documents in Lucene index with the same document id. *

- * The primary captures this value after executes a replication request, then transfers it to a replica before executing - * that replication request on a replica. + * The primary captures this value after executes a replication request, then transfers it to a replica before + * executing that replication request on a replica. */ public long getMaxSeqNoOfUpdatesOrDeletes() { return getEngine().getMaxSeqNoOfUpdatesOrDeletes(); @@ -2738,9 +2750,8 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { /** * Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number. *

- * A primary calls this method only once to initialize this maker after being promoted or when it finishes its - * recovery or relocation. Whereas a replica calls this method before executing a replication request or before - * applying translog operations in peer-recovery. + * A primary calls this method once to initialize this maker after it finishes the local recovery. Whereas a replica + * calls this method before executing a replication request or before applying translog operations in peer-recovery. */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 003cff2441319..b37d6b213fafe 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5038,7 +5038,7 @@ public void testAcquireSearcherOnClosingEngine() throws Exception { expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); } - public void testTrackMaxSeqNoOfUpdatesOrDeletes() throws Exception { + public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { engine.close(); Set liveDocIds = new HashSet<>(); engine = new InternalEngine(engine.config()); @@ -5049,32 +5049,19 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletes() throws Exception { long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null); if (randomBoolean()) { - if (randomBoolean()) { - Engine.IndexResult result = engine.index(indexForDoc(doc)); - if (liveDocIds.add(doc.id()) == false) { - assertThat("update operations on primary must advance max_seq_no_of_updates", - engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); - } else { - assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); - } - } else { - Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); - liveDocIds.remove(doc.id()); - assertThat("delete operations on primary must advance max_seq_no_of_updates", + Engine.IndexResult result = engine.index(indexForDoc(doc)); + if (liveDocIds.add(doc.id()) == false) { + assertThat("update operations on primary must advance max_seq_no_of_updates", engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); - } - } else { - long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); - long seqNo = randomLongBetween(maxSeqNo + 1, maxSeqNo + 10); - if (randomBoolean()) { - engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean())); - liveDocIds.add(doc.id()); } else { - engine.delete(replicaDeleteForDoc(doc.id(), 1, seqNo, threadPool.relativeTimeInMillis())); - liveDocIds.remove(doc.id()); + assertThat("append operations should not advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); } - assertThat("non-primary operations should not advance max_seq_no_of_updates", - engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); + } else { + Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + liveDocIds.remove(doc.id()); + assertThat("delete operations on primary must advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); } } } From f27d5f88c9513dd929a9e7b788f2259fcc66dc95 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 12:16:13 -0400 Subject: [PATCH 04/10] initialize on the primary --- .../elasticsearch/index/engine/Engine.java | 27 ++++++++++-------- .../index/engine/InternalEngine.java | 8 ++++++ .../index/engine/ReadOnlyEngine.java | 5 ++++ .../elasticsearch/index/shard/IndexShard.java | 24 ++++++++++------ .../index/translog/Translog.java | 14 ++++++++++ .../index/shard/IndexShardTests.java | 1 + .../index/translog/TranslogTests.java | 28 ++++++++++++++++++- 7 files changed, 86 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index af2bbbc5aecfc..bed59b1e2b9f5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -139,7 +139,7 @@ public abstract class Engine implements Closeable { /* * This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine. * An update operation is just an index request which overwrites existing documents with the same docId in the Lucene index. - * This marker is started with an unassigned status(-2), then will be initialized from outside (via advanceMaxSeqNoOfUpdatesOrDeletes). + * This marker is started uninitialized (-2), then will be initialized from outside (via initializeMaxSeqNoOfUpdatesOrDeletes). * The optimization using seq_no will be disabled (regardless of other conditions) if this marker is still uninitialized (-2). */ private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); @@ -1776,26 +1776,29 @@ public interface TranslogRecoveryRunner { } /** - * Returns the maximum sequence number of either update or delete operations have been processed - * in this engine or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. - * An index request is considered as an update operation if it overwritten the existing documents - * in Lucene index with the same document id. + * Returns the maximum sequence number of either update or delete operations have been processed in this engine + * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered + * as an update operation if it overwritten the existing documents in Lucene index with the same document id. * - *

- * For a primary engine, this value is initialized once, then advanced internally when it processes - * an update or a delete operation. Whereas a replica engine never updates this value by itself but - * only inherits the latest value from its primary. In both cases, this value never goes backwards. + * @see #initializeMaxSeqNoOfUpdatesOrDeletes() + * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) */ public final long getMaxSeqNoOfUpdatesOrDeletes() { return maxSeqNoOfUpdatesOrDeletes.get(); } /** - * Advances the max_seq_no_of_updates marker of this engine to at least the given sequence number. - * @see #getMaxSeqNoOfUpdatesOrDeletes() + * A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the + * max_seq_no from Lucene index and translog before replaying the local translog in its local recovery. + */ + public abstract void initializeMaxSeqNoOfUpdatesOrDeletes(); + + /** + * A replica shard receives a new max_seq_no_of_updates from its primary shard, then call this method to + * advance this marker to at least the given sequence number. */ public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); - assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo; + assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0510a008404e5..3a9d3a6c5b014 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2538,4 +2538,12 @@ void updateRefreshedCheckpoint(long checkpoint) { assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; } } + + @Override + public void initializeMaxSeqNoOfUpdatesOrDeletes() { + assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : + "max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]"; + final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.maxSeqNo()); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 80b653939299f..ea18563589d56 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -373,4 +373,9 @@ public void maybePruneDeletes() { public DocsStats docStats() { return docsStats; } + + @Override + public void initializeMaxSeqNoOfUpdatesOrDeletes() { + advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo()); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e479ffb961284..9f8792ae1bcd0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -516,7 +516,7 @@ public void updateShardState(final ShardRouting newRouting, // TODO: Enable this assertion after we replicate max_seq_no_updates during replication // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : // indexSettings.getIndexVersionCreated(); - advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); } replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @@ -1305,8 +1305,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException { translogRecoveryStats::incrementRecoveredOperations); }; innerOpenEngineAndTranslog(); - getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); - advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + final Engine engine = getEngine(); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } /** @@ -1935,7 +1936,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { // TODO: Enable this assertion after we replicate max_seq_no_updates during replication // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated(); - advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + getEngine().initializeMaxSeqNoOfUpdatesOrDeletes(); } } } @@ -2724,13 +2725,20 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { } /** - * Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number. + * A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates + * value (piggybacked in a replication request) that it receives from its primary before executing that replication request. + *

+ * A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker using the value that it received from + * the primary in peer-recovery before it replays remote translog operations from the primary. *

- * A primary calls this method once to initialize this maker after it finishes the local recovery. Whereas a replica - * calls this method before executing a replication request or before applying translog operations in peer-recovery. + * These transfers guarantee that every index/delete operation executing on a replica engine will observe this marker a value + * which is at least the value of the marker on the primary after that operation was executed on the primary. + * + * @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object) + * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); - assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes(); + assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index f17acac37896d..9a7e98ca0b7a2 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1825,6 +1826,19 @@ public String getTranslogUUID() { return translogUUID; } + /** + * Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current + * existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog. + */ + public long maxSeqNo() { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current)) + .mapToLong(reader -> reader.getCheckpoint().maxSeqNo).max(); + assert maxSeqNo.isPresent() : "must have at least one translog generation"; + return maxSeqNo.getAsLong(); + } + } TranslogWriter getCurrent() { return current; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1a05bed71d5e7..1338725fe1445 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1702,6 +1702,7 @@ public void testRecoverFromStore() throws IOException { assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L)); assertThat(newShard.getReplicationTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId()) .getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat(newShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(totalOps - 1L)); assertDocCount(newShard, totalOps); assertThat(newShard.getHistoryUUID(), equalTo(historyUUID)); closeShards(newShard); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index a0e0c481e5f86..eaa98fa277b38 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -184,7 +184,7 @@ private void rollAndCommit(Translog translog) throws IOException { markCurrentGenAsCommitted(translog); } - private void commit(Translog translog, long genToRetain, long genToCommit) throws IOException { + private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); @@ -192,6 +192,7 @@ private void commit(Translog translog, long genToRetain, long genToCommit) throw translog.trimUnreferencedReaders(); assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); assertFilePresences(translog); + return minGenRequired; } @Override @@ -3054,6 +3055,31 @@ void callCloseOnTragicEvent() { misbehavingTranslog.callCloseOnTragicEvent(); } + public void testMaxSeqNo() throws Exception { + Map maxSeqNoPerGeneration = new HashMap<>(); + for (int iterations = between(1, 10), i = 0; i < iterations; i++) { + long startSeqNo = randomLongBetween(0, Integer.MAX_VALUE); + List seqNos = LongStream.range(startSeqNo, startSeqNo + randomInt(100)).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + for (long seqNo : seqNos) { + if (frequently()) { + translog.add(new Translog.Index("test", "id", seqNo, primaryTerm.get(), new byte[]{1})); + maxSeqNoPerGeneration.compute(translog.currentFileGeneration(), + (key, existing) -> existing == null ? seqNo : Math.max(existing, seqNo)); + } + } + translog.rollGeneration(); + } + translog.sync(); + assertThat(translog.maxSeqNo(), + equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); + long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); + long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() + .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) + .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); + assertThat(translog.maxSeqNo(), equalTo(expectedMaxSeqNo)); + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null; From a7269dc56e5e238fa9fabd00e11ffb2e80318c83 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 14:45:35 -0400 Subject: [PATCH 05/10] boostrap --- .../org/elasticsearch/index/engine/Engine.java | 14 ++++++++------ .../org/elasticsearch/index/shard/IndexShard.java | 12 +++++++----- .../index/engine/InternalEngineTests.java | 10 +++++----- .../index/shard/RefreshListenersTests.java | 2 +- .../elasticsearch/index/engine/EngineTestCase.java | 4 ++-- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3100fb3c6bbb0..c1d7c3380853d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -138,9 +138,11 @@ public abstract class Engine implements Closeable { /* * This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine. - * An update operation is just an index request which overwrites existing documents with the same docId in the Lucene index. - * This marker is started uninitialized (-2), then will be initialized from outside (via initializeMaxSeqNoOfUpdatesOrDeletes). - * The optimization using seq_no will be disabled (regardless of other conditions) if this marker is still uninitialized (-2). + * An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. + * This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized. + * The value of this marker never goes backwards, and is updated/changed differently on primary and replica: + * 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete. + * 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes). */ private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); @@ -1778,7 +1780,7 @@ public interface TranslogRecoveryRunner { /** * Returns the maximum sequence number of either update or delete operations have been processed in this engine * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered - * as an update operation if it overwritten the existing documents in Lucene index with the same document id. + * as an update operation if it overwrites the existing documents in Lucene index with the same document id. * * @see #initializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) @@ -1794,8 +1796,8 @@ public final long getMaxSeqNoOfUpdatesOrDeletes() { public abstract void initializeMaxSeqNoOfUpdatesOrDeletes(); /** - * A replica shard receives a new max_seq_no_of_updates from its primary shard, then call this method to - * advance this marker to at least the given sequence number. + * A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method + * to advance this marker to at least the given sequence number. */ public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 97564335c49e7..f0cf7d8d3cdc7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2716,7 +2716,7 @@ void resetEngineToGlobalCheckpoint() throws IOException { /** * Returns the maximum sequence number of either update or delete operations have been processed in this shard * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered - * as an update operation if it overwritten the existing documents in Lucene index with the same document id. + * as an update operation if it overwrites the existing documents in Lucene index with the same document id. *

* The primary captures this value after executes a replication request, then transfers it to a replica before * executing that replication request on a replica. @@ -2728,12 +2728,14 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { /** * A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates * value (piggybacked in a replication request) that it receives from its primary before executing that replication request. + * The receiving value is at least the highest max_seq_no_of_updates of all index/delete operations in that replication request. *

- * A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker using the value that it received from - * the primary in peer-recovery before it replays remote translog operations from the primary. + * A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from + * the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least + * the highest max_seq_no_of_updates of all translog operations will be replayed in that peer-recovery. *

- * These transfers guarantee that every index/delete operation executing on a replica engine will observe this marker a value - * which is at least the value of the marker on the primary after that operation was executed on the primary. + * These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value + * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. * * @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object) * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ae8fc03e87dc4..b26e4eb4f1d49 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -680,7 +680,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); @@ -2680,7 +2680,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); globalCheckpoint.set(engine.getLocalCheckpoint()); @@ -3468,7 +3468,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); @@ -4366,7 +4366,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); @@ -5040,7 +5040,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { Set liveDocIds = new HashSet<>(); engine = new InternalEngine(engine.config()); assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(-1L, 50L)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); int numOps = between(1, 500); for (int i = 0; i < numOps; i++) { long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 804afa35211c1..6111f60307003 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -128,7 +128,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getSeqNoStats(-1).getMaxSeqNo()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 795babce6997b..2529548606cab 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -478,8 +478,8 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - // IndexShard initializes this value after replaying local translog. - internalEngine.advanceMaxSeqNoOfUpdatesOrDeletes(internalEngine.getLocalCheckpointTracker().getMaxSeqNo()); + // IndexShard initializes this marker after replaying local translog. + internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); return internalEngine; } From 2dd87585f72530ee9e3ac9f1bc60f16c5b364d27 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 14:59:41 -0400 Subject: [PATCH 06/10] maxSeqNo -> getMaxSeqNo --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- .../java/org/elasticsearch/index/translog/TranslogTests.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 351b143d62bb6..762336cb4a45b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2538,7 +2538,7 @@ void updateRefreshedCheckpoint(long checkpoint) { public void initializeMaxSeqNoOfUpdatesOrDeletes() { assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]"; - final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.maxSeqNo()); + final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()); advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 9a7e98ca0b7a2..0b91de81932b4 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1830,7 +1830,7 @@ public String getTranslogUUID() { * Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current * existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog. */ - public long maxSeqNo() { + public long getMaxSeqNo() { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current)) diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index eaa98fa277b38..45bf7a700aa5d 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -3071,13 +3071,13 @@ public void testMaxSeqNo() throws Exception { translog.rollGeneration(); } translog.sync(); - assertThat(translog.maxSeqNo(), + assertThat(translog.getMaxSeqNo(), equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); - assertThat(translog.maxSeqNo(), equalTo(expectedMaxSeqNo)); + assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); } static class SortedSnapshot implements Translog.Snapshot { From dcc23fdf59fa281b839847e13ccf1cbff52072f4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 16:12:33 -0400 Subject: [PATCH 07/10] wording --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f0cf7d8d3cdc7..891642d8d0d8a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2728,11 +2728,12 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { /** * A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates * value (piggybacked in a replication request) that it receives from its primary before executing that replication request. - * The receiving value is at least the highest max_seq_no_of_updates of all index/delete operations in that replication request. + * The receiving value is at least as high as the max_seq_no_of_updates on the primary was when any of the operations of that + * replication request were processed on it. *

* A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from * the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least - * the highest max_seq_no_of_updates of all translog operations will be replayed in that peer-recovery. + * as high as the max_seq_no_of_updates on the primary was when any of these operations were processed on it. *

* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. From 94fa692dd7ee6e0e2a7e023202f60c4f2d66b619 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 22:10:46 -0400 Subject: [PATCH 08/10] TEST: init msu before recover translog --- .../elasticsearch/index/engine/InternalEngineTests.java | 8 ++++---- .../elasticsearch/index/shard/RefreshListenersTests.java | 2 +- .../org/elasticsearch/index/engine/EngineTestCase.java | 3 +-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d8b8fe10677cf..b02cd2c9e4e35 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -679,8 +679,8 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); @@ -2679,8 +2679,8 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } } }) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); globalCheckpoint.set(engine.getLocalCheckpoint()); @@ -3467,8 +3467,8 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { try (Store store = createStore(newFSDirectory(storeDir)); InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); @@ -4367,8 +4367,8 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 6111f60307003..cbc08b19e8a07 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -127,8 +127,8 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); - engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2529548606cab..69a9f51ab698e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -477,9 +477,8 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - // IndexShard initializes this marker after replaying local translog. internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } From e92e71e790f0be94d81b2c4ddf18037696a58d52 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Sep 2018 11:45:57 -0400 Subject: [PATCH 09/10] assert msu ready when recover from translog --- .../index/engine/InternalEngine.java | 1 + .../elasticsearch/index/shard/IndexShard.java | 2 ++ .../index/engine/InternalEngineTests.java | 23 +++++++++++++++++-- .../index/engine/ReadOnlyEngineTests.java | 1 + 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f830913db4c20..2f38562b7af06 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -385,6 +385,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover flushLock.lock(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); + assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized"; if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 958998f97b787..f5f8d70925f5f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2733,6 +2733,8 @@ void resetEngineToGlobalCheckpoint() throws IOException { engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog }); + // TODO: do not use init method here but use advance with the max_seq_no received from the primary + newEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b02cd2c9e4e35..e77203b83fee6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -662,6 +662,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { trimUnsafeCommits(engine.config()); engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); @@ -709,7 +710,8 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { IOUtils.close(engine); } trimUnsafeCommits(engine.config()); - try (Engine recoveringEngine = new InternalEngine(engine.config())){ + try (Engine recoveringEngine = new InternalEngine(engine.config())) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -746,6 +748,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { @@ -780,6 +783,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { initialEngine.close(); trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -813,6 +817,7 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { } trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); @@ -820,6 +825,7 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); @@ -1204,6 +1210,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { } trimUnsafeCommits(config); engine = new InternalEngine(config); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1223,6 +1230,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { engine.close(); trimUnsafeCommits(config); engine = new InternalEngine(config); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -2198,7 +2206,8 @@ public void testSeqNoAndCheckpoints() throws IOException { } trimUnsafeCommits(initialEngine.engineConfig); - try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){ + try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -2542,6 +2551,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2559,6 +2569,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); @@ -2573,6 +2584,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2691,6 +2703,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2758,6 +2771,7 @@ public void testTranslogReplay() throws IOException { engine.close(); trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier)); engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); @@ -3753,6 +3767,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro } trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -3864,6 +3879,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); } }; + noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; @@ -4091,6 +4107,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { } trimUnsafeCommits(engineConfig); try (InternalEngine engine = new InternalEngine(engineConfig)) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); @@ -4138,6 +4155,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4174,6 +4192,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 4080dd33d5341..90469d71944ef 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -95,6 +95,7 @@ public void testReadOnlyEngine() throws Exception { // Close and reopen the main engine InternalEngineTests.trimUnsafeCommits(config); try (InternalEngine recoveringEngine = new InternalEngine(config)) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); From 84a985fb5a549462c9e7948770f4600f3c57524d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Sep 2018 23:58:26 -0400 Subject: [PATCH 10/10] fix tests in following engine --- .../xpack/ccr/index/engine/FollowingEngineTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index b3e2d12227b59..c9a4de8f03cd4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -271,6 +271,7 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); + followingEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; }