Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
assert indexShouldExists;
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
Expand Down Expand Up @@ -466,9 +466,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f
final Store store = indexShard.store();
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
Expand Down
11 changes: 6 additions & 5 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1428,26 +1428,27 @@ public void bootstrapNewHistory() throws IOException {
try {
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
bootstrapNewHistory(maxSeqNo);
final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
bootstrapNewHistory(localCheckpoint, maxSeqNo);
} finally {
metadataLock.writeLock().unlock();
}
}

/**
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
* Marks an existing lucene index with a new history uuid and sets the given local checkpoint
* as well as the maximum sequence number.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
* This is used to make sure no existing shard will recover from this index using ops based recovery.
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
* @see SequenceNumbers#MAX_SEQ_NO
*/
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
Expand Down
131 changes: 12 additions & 119 deletions server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -46,8 +45,6 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -62,7 +59,6 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
Expand Down Expand Up @@ -107,7 +103,6 @@
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.store.StoreUtils;
Expand All @@ -121,12 +116,8 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
Expand All @@ -143,7 +134,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -174,7 +164,6 @@
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -2159,9 +2148,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc

public void testRestoreShard() throws IOException {
final IndexShard source = newStartedShard(true);
IndexShard target = newStartedShard(true);
IndexShard target = newStartedShard(true, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build());

indexDoc(source, "_doc", "0");
EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history
indexDoc(source, "_doc", "2");
if (randomBoolean()) {
source.refresh("test");
}
Expand Down Expand Up @@ -2197,16 +2189,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
}
}
}));
assertThat(target.getLocalCheckpoint(), equalTo(0L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));

assertDocs(target, "0");
assertDocs(target, "0", "2");

closeShards(source, target);
closeShard(source, false);
closeShards(target);
}

public void testSearcherWrapperIsUsed() throws IOException {
Expand Down Expand Up @@ -3131,107 +3125,6 @@ private Result indexOnReplicaWithGaps(
return new Result(localCheckpoint, max);
}

/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;

RestoreOnlyRepository(String indexName) {
this.indexName = indexName;
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
}

@Override
protected void doClose() {
}

@Override
public RepositoryMetaData getMetadata() {
return null;
}

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return null;
}

@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
return null;
}

@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
return null;
}

@Override
public RepositoryData getRepositoryData() {
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), emptySet());
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState) {
return null;
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
}

@Override
public long getSnapshotThrottleTimeInNanos() {
return 0;
}

@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
}

@Override
public String startVerification() {
return null;
}

@Override
public void endVerification(String verificationToken) {
}

@Override
public boolean isReadOnly() {
return false;
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return null;
}

@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
}
}

public void testIsSearchIdle() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down
Loading