Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ protected void doRun() throws Exception {
return future;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
public void testRecoveryWithConcurrentIndexing() throws Exception {
final String index = "recovery_with_concurrent_indexing";
Response response = client().performRequest(new Request("GET", "_nodes"));
Expand Down Expand Up @@ -184,7 +183,6 @@ private String getNodeId(Predicate<Version> versionPredicate) throws IOException
}


@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
public void testRelocationWithConcurrentIndexing() throws Exception {
final String index = "relocation_with_concurrent_indexing";
switch (CLUSTER_TYPE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,12 +1720,12 @@ public interface Warmer {
public abstract void deactivateThrottling();

/**
* Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary
* promotion.
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
*
* @throws IOException if an I/O exception occurred reading the translog
* @return the number of translog operations have been recovered
*/
public abstract void restoreLocalCheckpointFromTranslog() throws IOException;
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,12 @@ protected int getRefCount(IndexSearcher reference) {
}

@Override
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
}
}
return translogRecoveryRunner.run(this, snapshot);
Copy link
Member Author

@dnhatn dnhatn Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep track a max_seqno from translog to recover when we rollback this engine (i.e., recover_upto and max_seqno in translog at that time), then only restore if needed. However, I opted out for simplicity.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ public void rollTranslogGeneration() {
}

@Override
public void restoreLocalCheckpointFromTranslog() {
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) {
return 0;
}

@Override
Expand Down
23 changes: 12 additions & 11 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,17 +494,16 @@ public void updateShardState(final ShardRouting newRouting,
try {
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
* its local checkpoint tracker was reset during the primary term transition. In particular, the local
* checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint
* tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
* replaying the translog and marking any operations there are completed.
* If this shard was serving as a replica shard when another shard was promoted to primary then
* its Lucene index was reset during the primary term transition. In particular, the Lucene index
* on this shard was reset to the global checkpoint and the operations above the local checkpoint
* were reverted. If the other shard that was promoted to primary subsequently fails before the
* primary/replica re-sync completes successfully and we are now being promoted, we have to restore
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
*/
final Engine engine = getEngine();
engine.restoreLocalCheckpointFromTranslog();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
Expand Down Expand Up @@ -1452,9 +1451,11 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
} else {
if (origin == Engine.Operation.Origin.PRIMARY) {
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
} else if (origin == Engine.Operation.Origin.REPLICA) {
assert assertReplicationTarget();
} else {
assert origin == Engine.Operation.Origin.LOCAL_RESET;
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
}
if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -4047,56 +4048,52 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
}
}

public void testRestoreLocalCheckpointFromTranslog() throws IOException {
engine.close();
InternalEngine actualEngine = null;
try {
final Set<Long> completedSeqNos = new HashSet<>();
final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (maxSeqNo, localCheckpoint) -> new LocalCheckpointTracker(
maxSeqNo,
localCheckpoint) {
@Override
public void markSeqNoAsCompleted(long seqNo) {
super.markSeqNoAsCompleted(seqNo);
completedSeqNos.add(seqNo);
}
};
trimUnsafeCommits(engine.config());
actualEngine = new InternalEngine(engine.config(), supplier);
final int operations = randomIntBetween(0, 1024);
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
for (int i = 0; i < operations; i++) {
if (rarely() && i < operations - 1) {
public void testRestoreLocalHistoryFromTranslog() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
final ArrayList<Long> seqNos = new ArrayList<>();
final int numOps = randomIntBetween(0, 1024);
for (int i = 0; i < numOps; i++) {
if (rarely()) {
continue;
}
expectedCompletedSeqNos.add((long) i);
seqNos.add((long) i);
}

final ArrayList<Long> seqNos = new ArrayList<>(expectedCompletedSeqNos);
Randomness.shuffle(seqNos);
for (final long seqNo : seqNos) {
final String id = Long.toString(seqNo);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
final Term uid = newUid(doc);
final long time = System.nanoTime();
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false));
if (rarely()) {
actualEngine.rollTranslogGeneration();
final EngineConfig engineConfig;
final SeqNoStats prevSeqNoStats;
final List<DocIdSeqNoAndTerm> prevDocs;
final int totalTranslogOps;
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
engineConfig = engine.config();
for (final long seqNo : seqNos) {
final String id = Long.toString(seqNo);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1, seqNo, false));
if (rarely()) {
engine.rollTranslogGeneration();
}
if (rarely()) {
engine.flush();
}
}
}
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
final long resetLocalCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
completedSeqNos.clear();
actualEngine.restoreLocalCheckpointFromTranslog();
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
assertThat(completedSeqNos, equalTo(intersection));
assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations));
} finally {
IOUtils.close(actualEngine);
globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getLocalCheckpoint()));
engine.syncTranslog();
prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
prevDocs = getDocIds(engine, true);
totalTranslogOps = engine.getTranslog().totalOperations();
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint()));
assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo()));
assertThat(engine.getTranslog().totalOperations(), equalTo(totalTranslogOps));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,23 +896,17 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
final long globalCheckpointOnReplica = SequenceNumbers.UNASSIGNED_SEQ_NO;
randomIntBetween(
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));

final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.getPendingPrimaryTerm() + 1,
Expand Down Expand Up @@ -946,6 +940,7 @@ public void onFailure(Exception e) {
resyncLatch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
closeShard(indexShard, false);
}

Expand Down