Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,19 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

// Retrieve the generation and UUID from the existing data
commitData = commits.get(commits.size() - 1).getUserData();
commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint;
// In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
// We can only safely do it because we will generate a new history uuid this shard.
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
// Also advances the local checkpoint of the last commit to its max_seqno.
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint));
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
Expand All @@ -153,7 +163,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);

terminal.println("Removing existing translog files");
IOUtils.rm(translogFiles.toArray(new Path[]{}));
Expand Down Expand Up @@ -190,9 +200,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

/** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration);
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
Expand All @@ -48,6 +49,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -74,6 +76,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
Expand Down Expand Up @@ -214,6 +217,10 @@ public void testCorruptTranslogTruncation() throws Exception {
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}

public void testCorruptTranslogTruncationOfReplica() throws Exception {
Expand Down Expand Up @@ -316,6 +323,10 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
// the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}

private Set<Path> getTranslogDirs(String indexName) throws IOException {
Expand Down Expand Up @@ -360,4 +371,10 @@ private static void disableTranslogFlush(String index) {
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}

private SeqNoStats getSeqNoStats(String index, int shardId) {
final ShardStats[] shardStats = client().admin().indices()
.prepareStats(index).get()
.getIndices().get(index).getShards();
return shardStats[shardId].getSeqNoStats();
}
}