From 34db798e280af28665260797ae0f3d77921081f2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 29 Apr 2019 20:10:58 -0400 Subject: [PATCH 1/8] Peer recovery should flush at the end --- .../indices/recovery/RecoveryTarget.java | 17 ++++++++ .../indices/recovery/IndexRecoveryIT.java | 41 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index a27ac8a352a32..7ab7470b033ab 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -28,6 +28,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -303,11 +304,27 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener l // Persist the global checkpoint. indexShard.sync(); indexShard.persistRetentionLeases(); + if (hasUncommittedOperations()) { + indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + } indexShard.finalizeRecovery(); return null; }); } + private boolean hasUncommittedOperations() { + if (indexShard.translogStats().getUncommittedOperations() == 0) { + return false; + } + // In peer recovery, we transfer history from primary to replica, thus we don't have to flush + // if all those uncommitted operations have baked into the existing Lucene index commit already. + final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( + indexShard.commitStats().getUserData().entrySet()); + return commitInfo.maxSeqNo != commitInfo.localCheckpoint + || commitInfo.maxSeqNo != indexShard.seqNoStats().getMaxSeqNo() + || commitInfo.maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint(); + } + @Override public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { indexShard.activateWithPrimaryContext(primaryContext); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 4196472334ca9..d1d2630148dd8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -52,9 +53,12 @@ import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.recovery.RecoveryStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.analysis.AnalysisModule; +import org.elasticsearch.indices.flush.SyncedFlushUtil; import org.elasticsearch.indices.recovery.RecoveryState.Stage; import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.AnalysisPlugin; @@ -84,14 +88,19 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -910,6 +919,38 @@ public void testDoNotInfinitelyWaitForMapping() { assertHitCount(client().prepareSearch().get(), numDocs); } + public void testRecoveryFlushReplica() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + String indexName = "test-index"; + createIndex(indexName, Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", 1).build()); + int numDocs = randomIntBetween(0, 10); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 1))); + ensureGreen(indexName); + ShardId shardId = null; + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) { + shardId = shardStats.getShardRouting().shardId(); + if (shardStats.getShardRouting().primary() == false) { + assertThat(shardStats.getCommitStats().getNumDocs(), equalTo(numDocs)); + SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( + shardStats.getCommitStats().getUserData().entrySet()); + assertThat(commitInfo.localCheckpoint, equalTo(shardStats.getSeqNoStats().getLocalCheckpoint())); + assertThat(commitInfo.maxSeqNo, equalTo(shardStats.getSeqNoStats().getMaxSeqNo())); + } + } + SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); + assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(indexName).get().failedShards(), equalTo(0))); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 2))); + ensureGreen(indexName); + Set syncIds = Stream.of(client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) + .map(shardStats -> shardStats.getCommitStats().syncId()) + .collect(Collectors.toSet()); + assertThat(syncIds, hasSize(1)); + } + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { final AtomicBoolean throwParsingError = new AtomicBoolean(); @Override From baceaad03de7d0ed591ccd2d55654b0f900289c9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Apr 2019 09:59:05 -0400 Subject: [PATCH 2/8] update comment --- .../org/elasticsearch/indices/recovery/RecoveryTarget.java | 4 ++-- .../org/elasticsearch/indices/recovery/IndexRecoveryIT.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 7ab7470b033ab..05df020b9785f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -316,8 +316,8 @@ private boolean hasUncommittedOperations() { if (indexShard.translogStats().getUncommittedOperations() == 0) { return false; } - // In peer recovery, we transfer history from primary to replica, thus we don't have to flush - // if all those uncommitted operations have baked into the existing Lucene index commit already. + // If a file-based occurs, the primary also sends its translog to the replica. If all of those + // translog operations are in the copying commit already, we should not flush (mainly to reserve syncId). final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( indexShard.commitStats().getUserData().entrySet()); return commitInfo.maxSeqNo != commitInfo.localCheckpoint diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index d1d2630148dd8..3130cebad7097 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -945,6 +945,7 @@ public void testRecoveryFlushReplica() throws Exception { assertAcked(client().admin().indices().prepareUpdateSettings(indexName) .setSettings(Settings.builder().put("index.number_of_replicas", 2))); ensureGreen(indexName); + // Recovery should keep syncId if no indexing activity on the primary after synced-flush. Set syncIds = Stream.of(client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) .map(shardStats -> shardStats.getCommitStats().syncId()) .collect(Collectors.toSet()); From 07c3a7c03cc493b63ea6c7300fb4dbc1bbe501e9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Apr 2019 10:14:19 -0400 Subject: [PATCH 3/8] use count translog ops --- .../indices/recovery/RecoveryTarget.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 05df020b9785f..0ebbc107b20e6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -312,17 +312,9 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener l }); } - private boolean hasUncommittedOperations() { - if (indexShard.translogStats().getUncommittedOperations() == 0) { - return false; - } - // If a file-based occurs, the primary also sends its translog to the replica. If all of those - // translog operations are in the copying commit already, we should not flush (mainly to reserve syncId). - final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( - indexShard.commitStats().getUserData().entrySet()); - return commitInfo.maxSeqNo != commitInfo.localCheckpoint - || commitInfo.maxSeqNo != indexShard.seqNoStats().getMaxSeqNo() - || commitInfo.maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint(); + private boolean hasUncommittedOperations() throws IOException { + long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0; } @Override From 6e952c5cc45067c4cfb54c3a8e0e54febf57cfcf Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 6 May 2019 10:54:43 -0400 Subject: [PATCH 4/8] always check max_seq_no = gcp in readonly engine --- .../index/engine/ReadOnlyEngine.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 5acac256dbd50..796f10674dbf7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -30,7 +30,6 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; -import org.elasticsearch.Version; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -123,19 +122,9 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) { - // Before 8.0 the global checkpoint is not known and up to date when the engine is created after - // peer recovery, so we only check the max seq no / global checkpoint coherency when the global - // checkpoint is different from the unassigned sequence number value. - // In addition to that we only execute the check if the index the engine belongs to has been - // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction - // that guarantee that all operations have been flushed to Lucene. - final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated(); - if (indexVersionCreated.onOrAfter(Version.V_7_2_0) || - (seqNoStats.getGlobalCheckpoint() != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) { - if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) { - throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() - + "] from last commit does not match global checkpoint [" + seqNoStats.getGlobalCheckpoint() + "]"); - } + if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) { + throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() + + "] from last commit does not match global checkpoint [" + seqNoStats.getGlobalCheckpoint() + "]"); } assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint()); } From d3ee8fa954b571ee93ba150c99ed4ab103c9a63d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 16 May 2019 09:34:00 -0400 Subject: [PATCH 5/8] Revert "always check max_seq_no = gcp in readonly engine" This reverts commit 6e952c5cc45067c4cfb54c3a8e0e54febf57cfcf. --- .../index/engine/ReadOnlyEngine.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 796f10674dbf7..5acac256dbd50 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -30,6 +30,7 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.elasticsearch.Version; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -122,9 +123,19 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) { - if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) { - throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() - + "] from last commit does not match global checkpoint [" + seqNoStats.getGlobalCheckpoint() + "]"); + // Before 8.0 the global checkpoint is not known and up to date when the engine is created after + // peer recovery, so we only check the max seq no / global checkpoint coherency when the global + // checkpoint is different from the unassigned sequence number value. + // In addition to that we only execute the check if the index the engine belongs to has been + // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction + // that guarantee that all operations have been flushed to Lucene. + final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated(); + if (indexVersionCreated.onOrAfter(Version.V_7_2_0) || + (seqNoStats.getGlobalCheckpoint() != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) { + if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) { + throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() + + "] from last commit does not match global checkpoint [" + seqNoStats.getGlobalCheckpoint() + "]"); + } } assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint()); } From a4aff09f6ac3368973461a8fdaa7127f614529d2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 16 May 2019 15:33:01 -0400 Subject: [PATCH 6/8] Fix compilation --- .../java/org/elasticsearch/indices/recovery/RecoveryTarget.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 80d48e84277da..b3c6d12ab96e3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; From 91811b78dbc98f98d36535d66c01e20ad81f62ef Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 21 May 2019 15:32:11 -0400 Subject: [PATCH 7/8] AwaitsFix testRefreshMetric --- .../test/java/org/elasticsearch/index/shard/IndexShardTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0be7b4433fac3..ff15de02c0695 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1481,6 +1481,7 @@ public String[] listAll() throws IOException { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/42211") public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery From c8d08a347bba99d1d9cba65d185b8841f80a3ef8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 21 May 2019 19:58:45 -0400 Subject: [PATCH 8/8] Revert "AwaitsFix testRefreshMetric" This reverts commit 91811b78dbc98f98d36535d66c01e20ad81f62ef. --- .../test/java/org/elasticsearch/index/shard/IndexShardTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e2a0fe21f025b..64b0c0db1dc8c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1522,7 +1522,6 @@ public String[] listAll() throws IOException { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/42211") public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery