Skip to content

Commit 002f763

Browse files
authored
Restore local history from translog on promotion (#33616)
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
1 parent b13a434 commit 002f763

File tree

7 files changed

+68
-81
lines changed

7 files changed

+68
-81
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ protected void doRun() throws Exception {
111111
return future;
112112
}
113113

114-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
115114
public void testRecoveryWithConcurrentIndexing() throws Exception {
116115
final String index = "recovery_with_concurrent_indexing";
117116
Response response = client().performRequest(new Request("GET", "_nodes"));
@@ -184,7 +183,6 @@ private String getNodeId(Predicate<Version> versionPredicate) throws IOException
184183
}
185184

186185

187-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
188186
public void testRelocationWithConcurrentIndexing() throws Exception {
189187
final String index = "relocation_with_concurrent_indexing";
190188
switch (CLUSTER_TYPE) {

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,12 +1720,12 @@ public interface Warmer {
17201720
public abstract void deactivateThrottling();
17211721

17221722
/**
1723-
* Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary
1724-
* promotion.
1723+
* This method replays translog to restore the Lucene index which might be reverted previously.
1724+
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
17251725
*
1726-
* @throws IOException if an I/O exception occurred reading the translog
1726+
* @return the number of translog operations have been recovered
17271727
*/
1728-
public abstract void restoreLocalCheckpointFromTranslog() throws IOException;
1728+
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;
17291729

17301730
/**
17311731
* Fills up the local checkpoints history with no-ops until the local checkpoint

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -332,17 +332,12 @@ protected int getRefCount(IndexSearcher reference) {
332332
}
333333

334334
@Override
335-
public void restoreLocalCheckpointFromTranslog() throws IOException {
336-
try (ReleasableLock ignored = writeLock.acquire()) {
335+
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
336+
try (ReleasableLock ignored = readLock.acquire()) {
337337
ensureOpen();
338338
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
339339
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
340-
Translog.Operation operation;
341-
while ((operation = snapshot.next()) != null) {
342-
if (operation.seqNo() > localCheckpoint) {
343-
localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
344-
}
345-
}
340+
return translogRecoveryRunner.run(this, snapshot);
346341
}
347342
}
348343
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,8 @@ public void rollTranslogGeneration() {
344344
}
345345

346346
@Override
347-
public void restoreLocalCheckpointFromTranslog() {
347+
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) {
348+
return 0;
348349
}
349350

350351
@Override

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -494,17 +494,16 @@ public void updateShardState(final ShardRouting newRouting,
494494
try {
495495
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
496496
/*
497-
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
498-
* its local checkpoint tracker was reset during the primary term transition. In particular, the local
499-
* checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint
500-
* tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary
501-
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
502-
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
503-
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
504-
* replaying the translog and marking any operations there are completed.
497+
* If this shard was serving as a replica shard when another shard was promoted to primary then
498+
* its Lucene index was reset during the primary term transition. In particular, the Lucene index
499+
* on this shard was reset to the global checkpoint and the operations above the local checkpoint
500+
* were reverted. If the other shard that was promoted to primary subsequently fails before the
501+
* primary/replica re-sync completes successfully and we are now being promoted, we have to restore
502+
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
505503
*/
506504
final Engine engine = getEngine();
507-
engine.restoreLocalCheckpointFromTranslog();
505+
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
506+
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
508507
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
509508
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
510509
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
@@ -1452,9 +1451,11 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
14521451
} else {
14531452
if (origin == Engine.Operation.Origin.PRIMARY) {
14541453
assert assertPrimaryMode();
1455-
} else {
1456-
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
1454+
} else if (origin == Engine.Operation.Origin.REPLICA) {
14571455
assert assertReplicationTarget();
1456+
} else {
1457+
assert origin == Engine.Operation.Origin.LOCAL_RESET;
1458+
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
14581459
}
14591460
if (writeAllowedStates.contains(state) == false) {
14601461
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
import org.elasticsearch.index.mapper.SourceFieldMapper;
149149
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
150150
import org.elasticsearch.index.seqno.ReplicationTracker;
151+
import org.elasticsearch.index.seqno.SeqNoStats;
151152
import org.elasticsearch.index.seqno.SequenceNumbers;
152153
import org.elasticsearch.index.shard.IndexSearcherWrapper;
153154
import org.elasticsearch.index.shard.ShardId;
@@ -4047,56 +4048,52 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
40474048
}
40484049
}
40494050

4050-
public void testRestoreLocalCheckpointFromTranslog() throws IOException {
4051-
engine.close();
4052-
InternalEngine actualEngine = null;
4053-
try {
4054-
final Set<Long> completedSeqNos = new HashSet<>();
4055-
final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (maxSeqNo, localCheckpoint) -> new LocalCheckpointTracker(
4056-
maxSeqNo,
4057-
localCheckpoint) {
4058-
@Override
4059-
public void markSeqNoAsCompleted(long seqNo) {
4060-
super.markSeqNoAsCompleted(seqNo);
4061-
completedSeqNos.add(seqNo);
4062-
}
4063-
};
4064-
trimUnsafeCommits(engine.config());
4065-
actualEngine = new InternalEngine(engine.config(), supplier);
4066-
final int operations = randomIntBetween(0, 1024);
4067-
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
4068-
for (int i = 0; i < operations; i++) {
4069-
if (rarely() && i < operations - 1) {
4051+
public void testRestoreLocalHistoryFromTranslog() throws IOException {
4052+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
4053+
try (Store store = createStore()) {
4054+
final ArrayList<Long> seqNos = new ArrayList<>();
4055+
final int numOps = randomIntBetween(0, 1024);
4056+
for (int i = 0; i < numOps; i++) {
4057+
if (rarely()) {
40704058
continue;
40714059
}
4072-
expectedCompletedSeqNos.add((long) i);
4060+
seqNos.add((long) i);
40734061
}
4074-
4075-
final ArrayList<Long> seqNos = new ArrayList<>(expectedCompletedSeqNos);
40764062
Randomness.shuffle(seqNos);
4077-
for (final long seqNo : seqNos) {
4078-
final String id = Long.toString(seqNo);
4079-
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
4080-
final Term uid = newUid(doc);
4081-
final long time = System.nanoTime();
4082-
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false));
4083-
if (rarely()) {
4084-
actualEngine.rollTranslogGeneration();
4063+
final EngineConfig engineConfig;
4064+
final SeqNoStats prevSeqNoStats;
4065+
final List<DocIdSeqNoAndTerm> prevDocs;
4066+
final int totalTranslogOps;
4067+
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
4068+
engineConfig = engine.config();
4069+
for (final long seqNo : seqNos) {
4070+
final String id = Long.toString(seqNo);
4071+
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
4072+
engine.index(replicaIndexForDoc(doc, 1, seqNo, false));
4073+
if (rarely()) {
4074+
engine.rollTranslogGeneration();
4075+
}
4076+
if (rarely()) {
4077+
engine.flush();
4078+
}
40854079
}
4086-
}
4087-
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
4088-
final long resetLocalCheckpoint =
4089-
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
4090-
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
4091-
completedSeqNos.clear();
4092-
actualEngine.restoreLocalCheckpointFromTranslog();
4093-
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
4094-
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
4095-
assertThat(completedSeqNos, equalTo(intersection));
4096-
assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
4097-
assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations));
4098-
} finally {
4099-
IOUtils.close(actualEngine);
4080+
globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getLocalCheckpoint()));
4081+
engine.syncTranslog();
4082+
prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
4083+
prevDocs = getDocIds(engine, true);
4084+
totalTranslogOps = engine.getTranslog().totalOperations();
4085+
}
4086+
trimUnsafeCommits(engineConfig);
4087+
try (InternalEngine engine = new InternalEngine(engineConfig)) {
4088+
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
4089+
engine.restoreLocalHistoryFromTranslog(translogHandler);
4090+
assertThat(getDocIds(engine, true), equalTo(prevDocs));
4091+
SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
4092+
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint()));
4093+
assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo()));
4094+
assertThat(engine.getTranslog().totalOperations(), equalTo(totalTranslogOps));
4095+
}
4096+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
41004097
}
41014098
}
41024099

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -896,23 +896,17 @@ public void testGlobalCheckpointSync() throws IOException {
896896
closeShards(replicaShard, primaryShard);
897897
}
898898

899-
public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
899+
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
900900
final IndexShard indexShard = newStartedShard(false);
901901
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
902902
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
903903

904904
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
905-
final long globalCheckpointOnReplica = SequenceNumbers.UNASSIGNED_SEQ_NO;
906-
randomIntBetween(
907-
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
908-
Math.toIntExact(indexShard.getLocalCheckpoint()));
905+
final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
909906
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
910907

911-
final int globalCheckpoint =
912-
randomIntBetween(
913-
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
914-
Math.toIntExact(indexShard.getLocalCheckpoint()));
915-
908+
final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
909+
final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
916910
final CountDownLatch latch = new CountDownLatch(1);
917911
indexShard.acquireReplicaOperationPermit(
918912
indexShard.getPendingPrimaryTerm() + 1,
@@ -946,6 +940,7 @@ public void onFailure(Exception e) {
946940
resyncLatch.await();
947941
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
948942
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
943+
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
949944
closeShard(indexShard, false);
950945
}
951946

0 commit comments

Comments
 (0)