Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1961,18 +1961,18 @@ public interface TranslogRecoveryRunner {
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #reinitializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}

/**
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
* A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();
public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes();

/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2741,9 +2741,7 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ public void updateShardState(final ShardRouting newRouting,
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
// in case we previously reset engine, we need to forward MSU before replaying translog.
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
Expand Down Expand Up @@ -1394,7 +1396,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};
innerOpenEngineAndTranslog();
final Engine engine = getEngine();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
Expand All @@ -695,7 +695,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
Expand Down Expand Up @@ -728,7 +728,7 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException {
}
trimUnsafeCommits(engine.config());
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
Expand Down Expand Up @@ -765,7 +765,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
Expand Down Expand Up @@ -800,7 +800,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
Expand Down Expand Up @@ -836,15 +836,15 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException {
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, upToSeqNo);
assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
Expand Down Expand Up @@ -1261,7 +1261,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException {
}
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
Expand All @@ -1282,7 +1282,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException {
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertNull("Sync ID must be gone since we have a document to replay",
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
Expand Down Expand Up @@ -2381,7 +2381,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

trimUnsafeCommits(initialEngine.engineConfig);
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);

assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
Expand Down Expand Up @@ -2737,7 +2737,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
Expand All @@ -2756,7 +2756,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
Expand All @@ -2771,7 +2771,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1",
Expand Down Expand Up @@ -2879,7 +2879,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null,
testDocumentWithTextField(), SOURCE, null);
Expand All @@ -2892,7 +2892,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier))) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
Expand Down Expand Up @@ -2963,7 +2963,7 @@ public void testTranslogReplay() throws IOException {
trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);

assertVisibleCount(engine, numDocs, false);
Expand Down Expand Up @@ -3726,7 +3726,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
Expand Down Expand Up @@ -4094,7 +4094,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
Expand Down Expand Up @@ -4207,7 +4207,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
}
};
noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = "filling gaps";
Expand Down Expand Up @@ -4444,7 +4444,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException {
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
Expand Down Expand Up @@ -4492,7 +4492,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
Expand Down Expand Up @@ -4529,7 +4529,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
Expand Down Expand Up @@ -4724,7 +4724,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
Expand Down Expand Up @@ -5500,7 +5500,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {
Set<String> liveDocIds = new HashSet<>();
engine = new InternalEngine(engine.config());
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
Expand Down Expand Up @@ -5571,7 +5571,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
"seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo())));
}
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testReadOnlyEngine() throws Exception {
// Close and reopen the main engine
InternalEngineTests.trimUnsafeCommits(config);
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
Expand Down Expand Up @@ -235,7 +235,7 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());

assertThat(translogHandler.appliedOperations(), equalTo(0L));
Expand Down
Loading