Skip to content

Commit f2db2a0

Browse files
authored
Truncate tlog cli should assign global checkpoint (#28192)
We are targeting to always have a safe index once the recovery is done. This invariant does not hold if the translog is manually truncated by users because the truncate translog cli resets the global checkpoint to unassigned. This commit assigns the global checkpoint to the max_seqno of the last commit when truncating translog. We can only safely do it because the truncate translog command will generate a new history uuid for that shard. With a new history UUID, sequence-based recovery between that shard and other old shards will be disabled. Relates #28181
1 parent a15ba75 commit f2db2a0

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,19 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
132132
}
133133

134134
// Retrieve the generation and UUID from the existing data
135-
commitData = commits.get(commits.size() - 1).getUserData();
135+
commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
136136
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
137137
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
138+
final long globalCheckpoint;
139+
// In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
140+
// We can only safely do it because we will generate a new history uuid this shard.
141+
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
142+
globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
143+
// Also advances the local checkpoint of the last commit to its max_seqno.
144+
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint));
145+
} else {
146+
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
147+
}
138148
if (translogGeneration == null || translogUUID == null) {
139149
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
140150
translogGeneration, translogUUID);
@@ -153,7 +163,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
153163
// Write empty checkpoint and translog to empty files
154164
long gen = Long.parseLong(translogGeneration);
155165
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
156-
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
166+
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);
157167

158168
terminal.println("Removing existing translog files");
159169
IOUtils.rm(translogFiles.toArray(new Path[]{}));
@@ -190,9 +200,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
190200
}
191201

192202
/** Write a checkpoint file to the given location with the given generation */
193-
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
203+
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
194204
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
195-
SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration);
205+
globalCheckpoint, translogGeneration);
196206
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
197207
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
198208
// fsync with metadata here to make sure.

server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
3232
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3333
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
34+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
3435
import org.elasticsearch.action.index.IndexRequestBuilder;
3536
import org.elasticsearch.action.search.SearchPhaseExecutionException;
3637
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -48,6 +49,7 @@
4849
import org.elasticsearch.index.Index;
4950
import org.elasticsearch.index.IndexSettings;
5051
import org.elasticsearch.index.MockEngineFactoryPlugin;
52+
import org.elasticsearch.index.seqno.SeqNoStats;
5153
import org.elasticsearch.index.shard.IndexShard;
5254
import org.elasticsearch.index.shard.ShardId;
5355
import org.elasticsearch.indices.IndicesService;
@@ -74,6 +76,7 @@
7476
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
7577
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
7678
import static org.hamcrest.Matchers.containsString;
79+
import static org.hamcrest.Matchers.equalTo;
7780
import static org.hamcrest.Matchers.greaterThan;
7881

7982
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
@@ -214,6 +217,10 @@ public void testCorruptTranslogTruncation() throws Exception {
214217
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
215218
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
216219
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
220+
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
221+
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
222+
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
223+
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
217224
}
218225

219226
public void testCorruptTranslogTruncationOfReplica() throws Exception {
@@ -316,6 +323,10 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
316323
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
317324
// the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
318325
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
326+
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
327+
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
328+
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
329+
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
319330
}
320331

321332
private Set<Path> getTranslogDirs(String indexName) throws IOException {
@@ -360,4 +371,10 @@ private static void disableTranslogFlush(String index) {
360371
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
361372
}
362373

374+
private SeqNoStats getSeqNoStats(String index, int shardId) {
375+
final ShardStats[] shardStats = client().admin().indices()
376+
.prepareStats(index).get()
377+
.getIndices().get(index).getShards();
378+
return shardStats[shardId].getSeqNoStats();
379+
}
363380
}

0 commit comments

Comments
 (0)