From 3c99ab364c11c86922eaa8a6bc694c9b9492c3f5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 17:15:10 -0500 Subject: [PATCH 1/4] Truncate tlog cli should assign global checkpoint We are targeting to always have a safe index once the recovery is done. This invariant does not hold if the translog is manually truncated by users because the truncate translog cli resets the global checkpoint to unassigned. This commit assigns the max_seqno of the last commit to the global checkpoint when truncating translog. Relates #28181 --- .../translog/TruncateTranslogCommand.java | 13 ++++++++++--- .../index/translog/TruncateTranslogIT.java | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index d9b77f841ed09..3039dbd8d2905 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -135,6 +135,13 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th commitData = commits.get(commits.size() - 1).getUserData(); String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint; + // In order to have a safe commit invariant, we have to assign max_seqno of the last commit to the global checkpoint. + if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) { + globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)); + } else { + globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + } if (translogGeneration == null || translogUUID == null) { throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", translogGeneration, translogUUID); @@ -153,7 +160,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th // Write empty checkpoint and translog to empty files long gen = Long.parseLong(translogGeneration); int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); - writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen); + writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); terminal.println("Removing existing translog files"); IOUtils.rm(translogFiles.toArray(new Path[]{})); @@ -190,9 +197,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th } /** Write a checkpoint file to the given location with the given generation */ - public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { + static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException { Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, - SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration); + globalCheckpoint, translogGeneration); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); // fsync with metadata here to make sure. diff --git a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index d98359cdd06a0..471bd3aa1b197 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -48,6 +49,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -73,7 +75,9 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) @@ -146,6 +150,7 @@ public void testCorruptTranslogTruncation() throws Exception { replica.flush(new FlushRequest()); logger.info("--> performed extra flushing on replica"); } + final SeqNoStats oldSeqNoStats = getSeqNoStats("test", 0); // shut down the replica node to be tested later internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); @@ -214,6 +219,9 @@ public void testCorruptTranslogTruncation() throws Exception { final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); + // Ensure that the global checkpoint is restored from the max seqno of the last commit. + final SeqNoStats seqNoStats = getSeqNoStats("test", 0); + assertThat(seqNoStats.getGlobalCheckpoint(), allOf(equalTo(seqNoStats.getMaxSeqNo()), equalTo(oldSeqNoStats.getMaxSeqNo()))); } public void testCorruptTranslogTruncationOfReplica() throws Exception { @@ -261,6 +269,7 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { final ShardId shardId = new ShardId(resolveIndex("test"), 0); Set translogDirs = getTranslogDirs(replicaNode, shardId); + final SeqNoStats oldSeqNoStats = getSeqNoStats("test", 0); // stop the cluster nodes. we don't use full restart so the node start up order will be the same // and shard roles will be maintained internalCluster().stopRandomDataNode(); @@ -316,6 +325,9 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); + // Ensure that the global checkpoint is restored from the max seqno of the last commit. + final SeqNoStats seqNoStats = getSeqNoStats("test", 0); + assertThat(seqNoStats.getGlobalCheckpoint(), allOf(equalTo(seqNoStats.getMaxSeqNo()), equalTo(oldSeqNoStats.getMaxSeqNo()))); } private Set getTranslogDirs(String indexName) throws IOException { @@ -360,4 +372,10 @@ private static void disableTranslogFlush(String index) { client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); } + private SeqNoStats getSeqNoStats(String index, int shardId) { + final ShardStats[] shardStats = client().admin().indices() + .prepareStats(index).get() + .getIndices().get(index).getShards(); + return shardStats[shardId].getSeqNoStats(); + } } From f6aa36b32a088d59cded18f2ef1e9b7c856c42d9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 23:44:35 -0500 Subject: [PATCH 2/4] Test only the latest values --- .../elasticsearch/index/translog/TruncateTranslogIT.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index 471bd3aa1b197..0eb6e5e3b59d9 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -75,7 +75,6 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -150,7 +149,6 @@ public void testCorruptTranslogTruncation() throws Exception { replica.flush(new FlushRequest()); logger.info("--> performed extra flushing on replica"); } - final SeqNoStats oldSeqNoStats = getSeqNoStats("test", 0); // shut down the replica node to be tested later internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); @@ -221,7 +219,7 @@ public void testCorruptTranslogTruncation() throws Exception { assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); // Ensure that the global checkpoint is restored from the max seqno of the last commit. final SeqNoStats seqNoStats = getSeqNoStats("test", 0); - assertThat(seqNoStats.getGlobalCheckpoint(), allOf(equalTo(seqNoStats.getMaxSeqNo()), equalTo(oldSeqNoStats.getMaxSeqNo()))); + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); } public void testCorruptTranslogTruncationOfReplica() throws Exception { @@ -269,7 +267,6 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { final ShardId shardId = new ShardId(resolveIndex("test"), 0); Set translogDirs = getTranslogDirs(replicaNode, shardId); - final SeqNoStats oldSeqNoStats = getSeqNoStats("test", 0); // stop the cluster nodes. we don't use full restart so the node start up order will be the same // and shard roles will be maintained internalCluster().stopRandomDataNode(); @@ -327,7 +324,7 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); // Ensure that the global checkpoint is restored from the max seqno of the last commit. final SeqNoStats seqNoStats = getSeqNoStats("test", 0); - assertThat(seqNoStats.getGlobalCheckpoint(), allOf(equalTo(seqNoStats.getMaxSeqNo()), equalTo(oldSeqNoStats.getMaxSeqNo()))); + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); } private Set getTranslogDirs(String indexName) throws IOException { From 817eb348275f269702c00e46c544eb7611093684 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 12 Jan 2018 15:48:17 -0500 Subject: [PATCH 3/4] Also advances the local checkpoint --- .../index/translog/TruncateTranslogCommand.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index 3039dbd8d2905..222e3e13d65e1 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -132,13 +132,16 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th } // Retrieve the generation and UUID from the existing data - commitData = commits.get(commits.size() - 1).getUserData(); + commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData()); String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint; - // In order to have a safe commit invariant, we have to assign max_seqno of the last commit to the global checkpoint. + // In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit. + // We can only safely do it because we will generate a new history uuid this shard. if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) { globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)); + // Also advances the local checkpoint of the last commit to its max_seqno. + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint)); } else { globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; } From 25494428dd55d4ad82d5880ed2f41e50bd8720a0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 12 Jan 2018 16:23:39 -0500 Subject: [PATCH 4/4] Asserts the local checkpoint --- .../elasticsearch/index/translog/TruncateTranslogIT.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index 0eb6e5e3b59d9..029ed50fb2851 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -217,9 +217,10 @@ public void testCorruptTranslogTruncation() throws Exception { final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); - // Ensure that the global checkpoint is restored from the max seqno of the last commit. + // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. final SeqNoStats seqNoStats = getSeqNoStats("test", 0); assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); } public void testCorruptTranslogTruncationOfReplica() throws Exception { @@ -322,9 +323,10 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); - // Ensure that the global checkpoint is restored from the max seqno of the last commit. + // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. final SeqNoStats seqNoStats = getSeqNoStats("test", 0); assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); } private Set getTranslogDirs(String indexName) throws IOException {