From 783cd1ea15f9f5aedc70167f70cca46bc039ff03 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 21 Apr 2019 18:25:12 -0400 Subject: [PATCH 01/14] Synced flush indices before closing --- ...TransportVerifyShardBeforeCloseAction.java | 26 ++++++++++++++++--- .../metadata/MetaDataIndexStateService.java | 3 ++- ...portVerifyShardBeforeCloseActionTests.java | 5 ++-- .../indices/state/CloseIndexIT.java | 25 +++++++++++++++--- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 22a0777f7bffb..d5cb5263c1ce1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -20,6 +20,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; @@ -32,12 +33,15 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.TaskId; @@ -110,8 +114,12 @@ private void executeShardOperation(final ShardRequest request, final IndexShard throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } indexShard.verifyShardBeforeIndexClosing(); - indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - logger.trace("{} shard is ready for closing", shardId); + final Engine.CommitId commitId = indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + // don't issue synced flush for recovering copies. + if (indexShard.state() == IndexShardState.STARTED && Strings.hasText(request.syncId)) { + indexShard.syncFlush(request.syncId, commitId); + } + logger.trace("{} shard is ready for closing with sync_id [{}]", shardId, request.syncId); } @Override @@ -135,21 +143,28 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all public static class ShardRequest extends ReplicationRequest { private final ClusterBlock clusterBlock; + private final String syncId; ShardRequest(StreamInput in) throws IOException { super(in); clusterBlock = new ClusterBlock(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + syncId = in.readString(); + } else { + syncId = ""; + } } - public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { + public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final String syncId, final TaskId parentTaskId) { super(shardId); this.clusterBlock = Objects.requireNonNull(clusterBlock); + this.syncId = Objects.requireNonNull(syncId); setParentTask(parentTaskId); } @Override public String toString() { - return "verify shard " + shardId + " before close with block " + clusterBlock; + return "verify shard " + shardId + " before close with block " + clusterBlock + " sync_id " + syncId; } @Override @@ -161,6 +176,9 @@ public void readFrom(final StreamInput in) { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); clusterBlock.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeString(syncId); + } } public ClusterBlock clusterBlock() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index a004d0a5a2324..60b3a8231a21e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -384,8 +384,9 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar return; } final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); + final String syncId = UUIDs.randomBase64UUID(); final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, syncId, parentTaskId); if (request.ackTimeout() != null) { shardRequest.timeout(request.ackTimeout()); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index d7974ed1c6365..20cf38c80bc28 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; @@ -138,7 +139,7 @@ public static void afterClass() { private void executeOnPrimaryOrReplica() throws Throwable { final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong()); final TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, UUIDs.randomBase64UUID(), taskId); final PlainActionFuture res = PlainActionFuture.newFuture(); action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap( r -> { @@ -227,7 +228,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { final PlainActionFuture listener = new PlainActionFuture<>(); TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L); TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, UUIDs.randomBase64UUID(), taskId); ReplicationOperation.Replicas proxy = action.newReplicasProxy(); ReplicationOperation operation = new ReplicationOperation<>( diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index af98ba990b253..b6cd03d23e346 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; @@ -50,6 +51,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -98,17 +100,34 @@ public void testCloseNullIndex() { } public void testCloseIndex() throws Exception { + final int numberOfReplicas = randomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createIndex(indexName); - + createIndex(indexName, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numberOfReplicas).build()); + ensureGreen(indexName); final int nbDocs = randomIntBetween(0, 50); indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); assertIndexIsClosed(indexName); - + ensureGreen(indexName); + for (RecoveryState recoveryState : + client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recoveryState.getPrimary() == false) { + assertThat(recoveryState.getIndex().fileDetails(), empty()); + } + } assertAcked(client().admin().indices().prepareOpen(indexName)); + ensureGreen(indexName); + for (RecoveryState recoveryState : + client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recoveryState.getPrimary() == false) { + assertThat(recoveryState.getIndex().fileDetails(), empty()); + } + } assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs); } From 1c485e02e6dc79a9500f9f673e369f6ae51273a7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 21 Apr 2019 20:05:20 -0400 Subject: [PATCH 02/14] move prepare action to internal engine readonly engine does not support synced-flush --- ...TransportVerifyShardBeforeCloseAction.java | 13 ++-------- .../elasticsearch/index/engine/Engine.java | 11 ++------ .../index/engine/InternalEngine.java | 21 ++++++++++++++++ .../index/engine/ReadOnlyEngine.java | 4 +-- .../elasticsearch/index/shard/IndexShard.java | 7 +++--- ...portVerifyShardBeforeCloseActionTests.java | 25 +++---------------- .../index/engine/ReadOnlyEngineTests.java | 7 +++--- .../ccr/index/engine/FollowingEngine.java | 2 +- .../index/engine/FollowingEngineTests.java | 9 ++++--- 9 files changed, 46 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index d5cb5263c1ce1..ecf058bc02edc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -33,15 +32,12 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.TaskId; @@ -113,13 +109,8 @@ private void executeShardOperation(final ShardRequest request, final IndexShard if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - indexShard.verifyShardBeforeIndexClosing(); - final Engine.CommitId commitId = indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - // don't issue synced flush for recovering copies. - if (indexShard.state() == IndexShardState.STARTED && Strings.hasText(request.syncId)) { - indexShard.syncFlush(request.syncId, commitId); - } - logger.trace("{} shard is ready for closing with sync_id [{}]", shardId, request.syncId); + indexShard.prepareShardBeforeIndexClosing(request.syncId); + logger.trace("{} shard is ready for closing", shardId); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 9bed93c371696..d52099c97abc4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -265,18 +265,11 @@ protected final DocsStats docsStats(IndexReader indexReader) { } /** - * Performs the pre-closing checks on the {@link Engine}. + * Performs the pre-closing action on the {@link Engine}. * * @throws IllegalStateException if the sanity checks failed */ - public void verifyEngineBeforeIndexClosing() throws IllegalStateException { - final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); - final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); - if (globalCheckpoint != maxSeqNo) { - throw new IllegalStateException("Global checkpoint [" + globalCheckpoint - + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); - } - } + public abstract void prepareEngineBeforeIndexClosing(String syncId) throws IllegalStateException; /** * A throttling class that can be activated, causing the diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 654d31d22671a..a3411b8b03127 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2756,4 +2756,25 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID); store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated()); } + + protected void verifyEngineBeforeIndexClosing() { + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); + if (globalCheckpoint != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + globalCheckpoint + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + } + + @Override + public void prepareEngineBeforeIndexClosing(String syncId) throws IllegalStateException { + try (ReleasableLock ignored = writeLock.acquire()) { + ensureOpen(); + verifyEngineBeforeIndexClosing(); + final CommitId commitId = flush(true, true); + if (syncId != null) { + syncFlush(syncId, commitId); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 777aff88e9dbc..b21a4fbbfed75 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -146,7 +146,7 @@ protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, fi } @Override - public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + public void prepareEngineBeforeIndexClosing(String syncId) throws IllegalStateException { // the value of the global checkpoint is verified when the read-only engine is opened, // and it is not expected to change during the lifecycle of the engine. We could also // check this value before closing the read-only engine but if something went wrong @@ -156,7 +156,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } protected final DirectoryReader wrapReader(DirectoryReader reader, - Function readerWrapperFunction) throws IOException { + Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c5dc2d024aaa8..7109fad1cdfff 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3159,12 +3159,13 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { } /** - * Performs the pre-closing checks on the {@link IndexShard}. + * Performs the pre-closing action on the {@link IndexShard}. * * @throws IllegalStateException if the sanity checks failed */ - public void verifyShardBeforeIndexClosing() throws IllegalStateException { - getEngine().verifyEngineBeforeIndexClosing(); + public void prepareShardBeforeIndexClosing(String syncId) throws IllegalStateException { + // don't issue synced-flush for recovering shards + getEngine().prepareEngineBeforeIndexClosing(state == IndexShardState.STARTED ? syncId : null); } RetentionLeaseSyncer getRetentionLeaseSyncer() { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 20cf38c80bc28..37d5312d15ee4 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -20,7 +20,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation; @@ -41,7 +40,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -57,7 +55,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.mockito.ArgumentCaptor; import java.util.Collections; import java.util.List; @@ -72,8 +69,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -157,22 +153,12 @@ private void executeOnPrimaryOrReplica() throws Throwable { } } - public void testShardIsFlushed() throws Throwable { - final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0])); - - executeOnPrimaryOrReplica(); - verify(indexShard, times(1)).flush(any(FlushRequest.class)); - assertThat(flushRequest.getValue().force(), is(true)); - } - public void testOperationFailsWhenNotBlocked() { when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(0, 10)); IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), equalTo("Index shard " + indexShard.shardId() + " is not blocking all operations during closing")); - verify(indexShard, times(0)).flush(any(FlushRequest.class)); } public void testOperationFailsWithNoBlock() { @@ -181,20 +167,17 @@ public void testOperationFailsWithNoBlock() { IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing")); - verify(indexShard, times(0)).flush(any(FlushRequest.class)); } public void testVerifyShardBeforeIndexClosing() throws Throwable { executeOnPrimaryOrReplica(); - verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); - verify(indexShard, times(1)).flush(any(FlushRequest.class)); + verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString()); } public void testVerifyShardBeforeIndexClosingFailed() { - doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing(); + doThrow(new IllegalStateException("test")).when(indexShard).prepareShardBeforeIndexClosing(anyString()); expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); - verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); - verify(indexShard, times(0)).flush(any(FlushRequest.class)); + verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString()); } public void testUnavailableShardsMarkedAsStale() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index e66094d7321a7..3175cfcb3cb20 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.mapper.ParsedDocument; @@ -183,10 +184,10 @@ public void testReadOnly() throws IOException { } /** - * Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails + * Test that {@link ReadOnlyEngine#prepareEngineBeforeIndexClosing(String)} never fails * whatever the value of the global checkpoint to check is. */ - public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { + public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { @@ -195,7 +196,7 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { globalCheckpoint.set(randomNonNegativeLong()); try { - readOnlyEngine.verifyEngineBeforeIndexClosing(); + readOnlyEngine.prepareEngineBeforeIndexClosing(UUIDs.randomBase64UUID()); } catch (final IllegalStateException e) { fail("Read-only engine pre-closing verifications failed"); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index bbb0689a8a7e6..c1cfbeb2e296b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -210,7 +210,7 @@ public long getNumberOfOptimizedIndexing() { } @Override - public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + protected void verifyEngineBeforeIndexClosing() { // the value of the global checkpoint is not verified when the following engine is closed, // allowing it to be closed even in the case where all operations have not been fetched and // processed from the leader and the operations history has gaps. This way the following diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index dfac5ef2654b8..2ceaaabf08d78 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -643,10 +644,10 @@ public void testProcessOnceOnPrimary() throws Exception { } /** - * Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails + * Test that {@link FollowingEngine#prepareEngineBeforeIndexClosing(String)} never fails * whatever the value of the global checkpoint to check is. */ - public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { + public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException { final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); runIndexTest( seqNo, @@ -654,7 +655,9 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { (followingEngine, index) -> { globalCheckpoint.set(randomNonNegativeLong()); try { - followingEngine.verifyEngineBeforeIndexClosing(); + String syncId = UUIDs.randomBase64UUID(); + followingEngine.prepareEngineBeforeIndexClosing(syncId); + assertThat(followingEngine.commitStats().syncId(), equalTo(syncId)); } catch (final IllegalStateException e) { fail("Following engine pre-closing verifications failed"); } From da0f7feb615054207364baabc378efcafe7afc67 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 21 Apr 2019 22:47:10 -0400 Subject: [PATCH 03/14] synced-flush post_recovery state --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7109fad1cdfff..bed7ec4e01378 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3165,7 +3165,8 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { */ public void prepareShardBeforeIndexClosing(String syncId) throws IllegalStateException { // don't issue synced-flush for recovering shards - getEngine().prepareEngineBeforeIndexClosing(state == IndexShardState.STARTED ? syncId : null); + final boolean canSyncedFlush = state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY; + getEngine().prepareEngineBeforeIndexClosing(canSyncedFlush ? syncId : null); } RetentionLeaseSyncer getRetentionLeaseSyncer() { From 9ed30cda0c143cfdb5b67343a26237537793f7d8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 26 Apr 2019 18:26:10 -0400 Subject: [PATCH 04/14] reuse syncId --- ...TransportVerifyShardBeforeCloseAction.java | 23 ++-- .../elasticsearch/index/engine/Engine.java | 5 +- .../index/engine/InternalEngine.java | 13 +- .../index/engine/ReadOnlyEngine.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 4 +- ...portVerifyShardBeforeCloseActionTests.java | 2 +- .../indices/state/CloseIndexIT.java | 117 +++++++++++++++--- 7 files changed, 131 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index ecf058bc02edc..c1f09780dc257 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -85,21 +85,23 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, } @Override - protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, + protected void shardOperationOnPrimary(final ShardRequest primaryRequest, final IndexShard primary, ActionListener> listener) { ActionListener.completeWith(listener, () -> { - executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + final String syncId = executeShardOperation(primaryRequest, primary); + final ShardRequest replicaRequest = new ShardRequest( + primaryRequest.shardId(), primaryRequest.clusterBlock, syncId, primaryRequest.getParentTask()); + return new PrimaryResult<>(replicaRequest, new ReplicationResponse()); }); } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) { + protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException { executeShardOperation(shardRequest, replica); return new ReplicaResult(); } - private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { + private String executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException { final ShardId shardId = indexShard.shardId(); if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) { throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing"); @@ -109,8 +111,9 @@ private void executeShardOperation(final ShardRequest request, final IndexShard if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - indexShard.prepareShardBeforeIndexClosing(request.syncId); + final String syncId = indexShard.prepareShardBeforeIndexClosing(request.syncId); logger.trace("{} shard is ready for closing", shardId); + return syncId; } @Override @@ -140,16 +143,16 @@ public static class ShardRequest extends ReplicationRequest { super(in); clusterBlock = new ClusterBlock(in); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - syncId = in.readString(); + syncId = in.readOptionalString(); } else { - syncId = ""; + syncId = null; } } public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final String syncId, final TaskId parentTaskId) { super(shardId); this.clusterBlock = Objects.requireNonNull(clusterBlock); - this.syncId = Objects.requireNonNull(syncId); + this.syncId = syncId; setParentTask(parentTaskId); } @@ -168,7 +171,7 @@ public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); clusterBlock.writeTo(out); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeString(syncId); + out.writeOptionalString(syncId); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index d52099c97abc4..c3f799ec208a4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -267,9 +267,12 @@ protected final DocsStats docsStats(IndexReader indexReader) { /** * Performs the pre-closing action on the {@link Engine}. * + * @param syncId a syncId that an engine can use to seal its index commit. If there was no indexing activity since the last sealed + * index commit, the engine might skip synced-flush and returns the existing syncId instead of the provided syncId. + * @return either the provided syncId or the existing syncId * @throws IllegalStateException if the sanity checks failed */ - public abstract void prepareEngineBeforeIndexClosing(String syncId) throws IllegalStateException; + public abstract String prepareEngineBeforeIndexClosing(String syncId) throws IOException; /** * A throttling class that can be activated, causing the diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a3411b8b03127..0066d6eebed6e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2758,7 +2758,7 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept } protected void verifyEngineBeforeIndexClosing() { - final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + final long globalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); if (globalCheckpoint != maxSeqNo) { throw new IllegalStateException("Global checkpoint [" + globalCheckpoint @@ -2767,14 +2767,23 @@ protected void verifyEngineBeforeIndexClosing() { } @Override - public void prepareEngineBeforeIndexClosing(String syncId) throws IllegalStateException { + public String prepareEngineBeforeIndexClosing(String syncId) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); + syncTranslog(); // make sure that we persist the global checkpoint to translog checkpoint verifyEngineBeforeIndexClosing(); + String existingSyncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); + // some out of order operations don't change Lucene but fill in sequence numbers and advance the local checkpoint. + long committedCheckpoint = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + if (existingSyncId != null && committedCheckpoint == getLocalCheckpoint() && indexWriter.hasUncommittedChanges() == false) { + return existingSyncId; + } + // force flush then synced-flush with the provided syncId final CommitId commitId = flush(true, true); if (syncId != null) { syncFlush(syncId, commitId); } + return syncId; } } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b21a4fbbfed75..7fba4694818f8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -146,13 +147,14 @@ protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, fi } @Override - public void prepareEngineBeforeIndexClosing(String syncId) throws IllegalStateException { + public String prepareEngineBeforeIndexClosing(String syncId) { // the value of the global checkpoint is verified when the read-only engine is opened, // and it is not expected to change during the lifecycle of the engine. We could also // check this value before closing the read-only engine but if something went wrong // and the global checkpoint is not in-sync with the max. sequence number anymore, // checking the value here again would prevent the read-only engine to be closed and // reopened as an internal engine, which would be the path to fix the issue. + return lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); } protected final DirectoryReader wrapReader(DirectoryReader reader, diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 179207d69d361..069357d4253b9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3163,10 +3163,10 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { * * @throws IllegalStateException if the sanity checks failed */ - public void prepareShardBeforeIndexClosing(String syncId) throws IllegalStateException { + public String prepareShardBeforeIndexClosing(String syncId) throws IOException { // don't issue synced-flush for recovering shards final boolean canSyncedFlush = state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY; - getEngine().prepareEngineBeforeIndexClosing(canSyncedFlush ? syncId : null); + return getEngine().prepareEngineBeforeIndexClosing(canSyncedFlush ? syncId : null); } RetentionLeaseSyncer getRetentionLeaseSyncer() { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 37d5312d15ee4..689ebe9fc8ead 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -174,7 +174,7 @@ public void testVerifyShardBeforeIndexClosing() throws Throwable { verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString()); } - public void testVerifyShardBeforeIndexClosingFailed() { + public void testVerifyShardBeforeIndexClosingFailed() throws Exception { doThrow(new IllegalStateException("test")).when(indexShard).prepareShardBeforeIndexClosing(anyString()); expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString()); diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index b6cd03d23e346..385b4e6fde9d5 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -21,7 +21,9 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -33,10 +35,12 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import org.hamcrest.Matchers; import java.util.ArrayList; import java.util.List; @@ -50,6 +54,7 @@ import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_ACCURATE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -69,6 +74,14 @@ public Settings indexSettings() { return super.indexSettings(); } + @Override + protected void beforeIndexDeletion() throws Exception { + super.beforeIndexDeletion(); + internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); + internalCluster().assertSeqNos(); + internalCluster().assertSameDocIdsOnShards(); + } + public void testCloseMissingIndex() { IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareClose("test").get()); assertThat(e.getMessage(), is("no such index [test]")); @@ -100,34 +113,17 @@ public void testCloseNullIndex() { } public void testCloseIndex() throws Exception { - final int numberOfReplicas = randomIntBetween(0, 2); - internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createIndex(indexName, Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numberOfReplicas).build()); - ensureGreen(indexName); + createIndex(indexName); + final int nbDocs = randomIntBetween(0, 50); indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); assertIndexIsClosed(indexName); - ensureGreen(indexName); - for (RecoveryState recoveryState : - client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { - if (recoveryState.getPrimary() == false) { - assertThat(recoveryState.getIndex().fileDetails(), empty()); - } - } + assertAcked(client().admin().indices().prepareOpen(indexName)); - ensureGreen(indexName); - for (RecoveryState recoveryState : - client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { - if (recoveryState.getPrimary() == false) { - assertThat(recoveryState.getIndex().fileDetails(), empty()); - } - } assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs); } @@ -357,6 +353,67 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertIndexIsClosed(indexName); } + public void testSyncedFlushBeforeClosing() throws Exception { + final String indexName = "synced_flush_before_closing"; + int numberOfReplicas = between(0, 2); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2)); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put("index.routing.rebalance.enable", "none") + .build()); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("num", i)).collect(toList())); + ensureGreen(indexName); + assertAcked(client().admin().indices().prepareClose(indexName).get()); + assertIndexIsClosed(indexName); + assertNoFileBasedRecovery(indexName); + String syncId = null; + for (ShardStats shardStats : getShardStats(indexName)) { + CommitStats commitStats = shardStats.getCommitStats(); + if (syncId == null) { + syncId = commitStats.syncId(); + } + assertThat(shardStats.getShardRouting() + " commit [" + commitStats + "]", + commitStats.syncId(), allOf(notNullValue(), equalTo(syncId))); + } + // Open a closed index should execute syncId recovery + assertAcked(client().admin().indices().prepareOpen(indexName).get()); + assertIndexIsOpened(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + assertSyncIdExists(indexName, syncId); + internalCluster().assertSameDocIdsOnShards(); + + // Close should not overwrite syncId if there's no write activity since the last close + assertAcked(client().admin().indices().prepareClose(indexName).get()); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + assertSyncIdExists(indexName, syncId); + internalCluster().assertSameDocIdsOnShards(); + + // Open for indexing + assertAcked(client().admin().indices().prepareOpen(indexName).get()); + assertIndexIsOpened(indexName); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("num", i)).collect(toList())); + // Close should overwrite syncId as there's indexing activity since the last close + assertAcked(client().admin().indices().prepareClose(indexName).get()); + assertIndexIsClosed(indexName); + assertNoFileBasedRecovery(indexName); + String newSyncId = null; + for (ShardStats shardStats : getShardStats(indexName)) { + CommitStats commitStats = shardStats.getCommitStats(); + if (newSyncId == null) { + newSyncId = commitStats.syncId(); + } + assertThat(shardStats.getShardRouting() + " commit [" + commitStats + "]", + commitStats.syncId(), allOf(notNullValue(), equalTo(newSyncId))); + } + assertThat(newSyncId, Matchers.not(equalTo(syncId))); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -402,4 +459,24 @@ static void assertException(final Throwable throwable, final String indexName) { fail("Unexpected exception: " + t); } } + + void assertSyncIdExists(String indexName, String expectedSyncId) { + for (ShardStats shardStats : getShardStats(indexName)) { + CommitStats commitStats = shardStats.getCommitStats(); + assertThat(shardStats.getShardRouting() + " commit: " + commitStats, commitStats.syncId(), equalTo(expectedSyncId)); + } + } + + void assertNoFileBasedRecovery(String indexName) { + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + } + } + } + + ShardStats[] getShardStats(String indexName) { + return client().admin().indices().prepareStats(indexName) + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED).get().getIndex(indexName).getShards(); + } } From 68c7d0c37eab81a5b993df9b16891a4f3b32bec2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 26 Apr 2019 23:00:45 -0400 Subject: [PATCH 05/14] wording --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 4 ++-- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++-- .../java/org/elasticsearch/index/engine/ReadOnlyEngine.java | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index c3f799ec208a4..2ec77259e8847 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -267,8 +267,8 @@ protected final DocsStats docsStats(IndexReader indexReader) { /** * Performs the pre-closing action on the {@link Engine}. * - * @param syncId a syncId that an engine can use to seal its index commit. If there was no indexing activity since the last sealed - * index commit, the engine might skip synced-flush and returns the existing syncId instead of the provided syncId. + * @param syncId a syncId that an engine can use to seal its index commit. If there was no indexing activity since the last seal, + * the engine can chooses to skip synced-flush and returns the existing syncId instead of the provided syncId. * @return either the provided syncId or the existing syncId * @throws IllegalStateException if the sanity checks failed */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0066d6eebed6e..4b13e08647945 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2772,9 +2772,9 @@ public String prepareEngineBeforeIndexClosing(String syncId) throws IOException ensureOpen(); syncTranslog(); // make sure that we persist the global checkpoint to translog checkpoint verifyEngineBeforeIndexClosing(); - String existingSyncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); + final String existingSyncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); // some out of order operations don't change Lucene but fill in sequence numbers and advance the local checkpoint. - long committedCheckpoint = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long committedCheckpoint = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); if (existingSyncId != null && committedCheckpoint == getLocalCheckpoint() && indexWriter.hasUncommittedChanges() == false) { return existingSyncId; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 7fba4694818f8..18775eb240da8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -31,7 +31,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; import org.elasticsearch.Version; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.util.concurrent.ReleasableLock; From fe9a294560a6eccc5ce7de948dd8740e14a0a9ee Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 26 Apr 2019 23:01:16 -0400 Subject: [PATCH 06/14] wording --- server/src/main/java/org/elasticsearch/index/engine/Engine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 2ec77259e8847..f1396d89d1e93 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -268,7 +268,7 @@ protected final DocsStats docsStats(IndexReader indexReader) { * Performs the pre-closing action on the {@link Engine}. * * @param syncId a syncId that an engine can use to seal its index commit. If there was no indexing activity since the last seal, - * the engine can chooses to skip synced-flush and returns the existing syncId instead of the provided syncId. + * the engine can choose to skip synced-flush and returns the existing syncId instead of the provided syncId. * @return either the provided syncId or the existing syncId * @throws IllegalStateException if the sanity checks failed */ From 1946d92ce55071951b18a81ab272e23383d641e5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 27 Apr 2019 12:59:31 -0400 Subject: [PATCH 07/14] fix tests --- .../allocation/ClusterAllocationExplainIT.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 941ad3c658aba..1f763342eafe1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -1346,9 +1346,14 @@ private void verifyNodeDecisions(XContentParser parser, Map Date: Sun, 28 Apr 2019 16:33:08 -0400 Subject: [PATCH 08/14] simplify syncId logic --- .../org/elasticsearch/index/engine/InternalEngine.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 4b13e08647945..15097b618eaa0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2772,13 +2772,10 @@ public String prepareEngineBeforeIndexClosing(String syncId) throws IOException ensureOpen(); syncTranslog(); // make sure that we persist the global checkpoint to translog checkpoint verifyEngineBeforeIndexClosing(); - final String existingSyncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); - // some out of order operations don't change Lucene but fill in sequence numbers and advance the local checkpoint. - final long committedCheckpoint = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - if (existingSyncId != null && committedCheckpoint == getLocalCheckpoint() && indexWriter.hasUncommittedChanges() == false) { - return existingSyncId; + // we can reuse the existing syncId if there was indexing activity since the last synced-flush. + if (indexWriter.hasUncommittedChanges() == false && lastCommittedSegmentInfos.userData.containsKey(Engine.SYNC_COMMIT_ID)) { + syncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); } - // force flush then synced-flush with the provided syncId final CommitId commitId = flush(true, true); if (syncId != null) { syncFlush(syncId, commitId); From 732049e2ae5a9ecce52485efcdc760846e8ac647 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 28 Apr 2019 16:33:53 -0400 Subject: [PATCH 09/14] fix comment --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 15097b618eaa0..2f99051caa929 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2772,7 +2772,7 @@ public String prepareEngineBeforeIndexClosing(String syncId) throws IOException ensureOpen(); syncTranslog(); // make sure that we persist the global checkpoint to translog checkpoint verifyEngineBeforeIndexClosing(); - // we can reuse the existing syncId if there was indexing activity since the last synced-flush. + // we can reuse the existing syncId if there was no indexing activity since the last synced-flush. if (indexWriter.hasUncommittedChanges() == false && lastCommittedSegmentInfos.userData.containsKey(Engine.SYNC_COMMIT_ID)) { syncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); } From 38bd362d7a187e8c84826d4b78f87cb077ea3f0b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 29 Apr 2019 13:44:23 -0400 Subject: [PATCH 10/14] backout sync id --- ...TransportVerifyShardBeforeCloseAction.java | 32 ++----- .../metadata/MetaDataIndexStateService.java | 3 +- .../elasticsearch/index/engine/Engine.java | 14 ++- .../index/engine/InternalEngine.java | 27 ------ .../index/engine/ReadOnlyEngine.java | 5 +- .../elasticsearch/index/shard/IndexShard.java | 8 +- .../ClusterAllocationExplainIT.java | 11 +-- ...portVerifyShardBeforeCloseActionTests.java | 32 +++++-- .../index/engine/ReadOnlyEngineTests.java | 7 +- .../indices/state/CloseIndexIT.java | 96 ------------------- .../ccr/index/engine/FollowingEngine.java | 2 +- .../index/engine/FollowingEngineTests.java | 9 +- 12 files changed, 59 insertions(+), 187 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index c1f09780dc257..22a0777f7bffb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -20,8 +20,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -85,23 +85,21 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, } @Override - protected void shardOperationOnPrimary(final ShardRequest primaryRequest, final IndexShard primary, + protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, ActionListener> listener) { ActionListener.completeWith(listener, () -> { - final String syncId = executeShardOperation(primaryRequest, primary); - final ShardRequest replicaRequest = new ShardRequest( - primaryRequest.shardId(), primaryRequest.clusterBlock, syncId, primaryRequest.getParentTask()); - return new PrimaryResult<>(replicaRequest, new ReplicationResponse()); + executeShardOperation(shardRequest, primary); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); }); } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException { + protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) { executeShardOperation(shardRequest, replica); return new ReplicaResult(); } - private String executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException { + private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { final ShardId shardId = indexShard.shardId(); if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) { throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing"); @@ -111,9 +109,9 @@ private String executeShardOperation(final ShardRequest request, final IndexShar if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - final String syncId = indexShard.prepareShardBeforeIndexClosing(request.syncId); + indexShard.verifyShardBeforeIndexClosing(); + indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); logger.trace("{} shard is ready for closing", shardId); - return syncId; } @Override @@ -137,28 +135,21 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all public static class ShardRequest extends ReplicationRequest { private final ClusterBlock clusterBlock; - private final String syncId; ShardRequest(StreamInput in) throws IOException { super(in); clusterBlock = new ClusterBlock(in); - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - syncId = in.readOptionalString(); - } else { - syncId = null; - } } - public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final String syncId, final TaskId parentTaskId) { + public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { super(shardId); this.clusterBlock = Objects.requireNonNull(clusterBlock); - this.syncId = syncId; setParentTask(parentTaskId); } @Override public String toString() { - return "verify shard " + shardId + " before close with block " + clusterBlock + " sync_id " + syncId; + return "verify shard " + shardId + " before close with block " + clusterBlock; } @Override @@ -170,9 +161,6 @@ public void readFrom(final StreamInput in) { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); clusterBlock.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeOptionalString(syncId); - } } public ClusterBlock clusterBlock() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 60b3a8231a21e..a004d0a5a2324 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -384,9 +384,8 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar return; } final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); - final String syncId = UUIDs.randomBase64UUID(); final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, syncId, parentTaskId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId); if (request.ackTimeout() != null) { shardRequest.timeout(request.ackTimeout()); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index f1396d89d1e93..9bed93c371696 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -265,14 +265,18 @@ protected final DocsStats docsStats(IndexReader indexReader) { } /** - * Performs the pre-closing action on the {@link Engine}. + * Performs the pre-closing checks on the {@link Engine}. * - * @param syncId a syncId that an engine can use to seal its index commit. If there was no indexing activity since the last seal, - * the engine can choose to skip synced-flush and returns the existing syncId instead of the provided syncId. - * @return either the provided syncId or the existing syncId * @throws IllegalStateException if the sanity checks failed */ - public abstract String prepareEngineBeforeIndexClosing(String syncId) throws IOException; + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); + if (globalCheckpoint != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + globalCheckpoint + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + } /** * A throttling class that can be activated, causing the diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2f99051caa929..654d31d22671a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2756,31 +2756,4 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID); store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated()); } - - protected void verifyEngineBeforeIndexClosing() { - final long globalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); - if (globalCheckpoint != maxSeqNo) { - throw new IllegalStateException("Global checkpoint [" + globalCheckpoint - + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); - } - } - - @Override - public String prepareEngineBeforeIndexClosing(String syncId) throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { - ensureOpen(); - syncTranslog(); // make sure that we persist the global checkpoint to translog checkpoint - verifyEngineBeforeIndexClosing(); - // we can reuse the existing syncId if there was no indexing activity since the last synced-flush. - if (indexWriter.hasUncommittedChanges() == false && lastCommittedSegmentInfos.userData.containsKey(Engine.SYNC_COMMIT_ID)) { - syncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); - } - final CommitId commitId = flush(true, true); - if (syncId != null) { - syncFlush(syncId, commitId); - } - return syncId; - } - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 18775eb240da8..777aff88e9dbc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -146,18 +146,17 @@ protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, fi } @Override - public String prepareEngineBeforeIndexClosing(String syncId) { + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is verified when the read-only engine is opened, // and it is not expected to change during the lifecycle of the engine. We could also // check this value before closing the read-only engine but if something went wrong // and the global checkpoint is not in-sync with the max. sequence number anymore, // checking the value here again would prevent the read-only engine to be closed and // reopened as an internal engine, which would be the path to fix the issue. - return lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID); } protected final DirectoryReader wrapReader(DirectoryReader reader, - Function readerWrapperFunction) throws IOException { + Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index aca10b0d058cf..2476eabf980c6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3179,14 +3179,12 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { } /** - * Performs the pre-closing action on the {@link IndexShard}. + * Performs the pre-closing checks on the {@link IndexShard}. * * @throws IllegalStateException if the sanity checks failed */ - public String prepareShardBeforeIndexClosing(String syncId) throws IOException { - // don't issue synced-flush for recovering shards - final boolean canSyncedFlush = state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY; - return getEngine().prepareEngineBeforeIndexClosing(canSyncedFlush ? syncId : null); + public void verifyShardBeforeIndexClosing() throws IllegalStateException { + getEngine().verifyEngineBeforeIndexClosing(); } RetentionLeaseSyncer getRetentionLeaseSyncer() { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 1f763342eafe1..941ad3c658aba 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -1346,14 +1346,9 @@ private void verifyNodeDecisions(XContentParser parser, Map res = PlainActionFuture.newFuture(); action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap( r -> { @@ -153,12 +156,22 @@ private void executeOnPrimaryOrReplica() throws Throwable { } } + public void testShardIsFlushed() throws Throwable { + final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); + when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0])); + + executeOnPrimaryOrReplica(); + verify(indexShard, times(1)).flush(any(FlushRequest.class)); + assertThat(flushRequest.getValue().force(), is(true)); + } + public void testOperationFailsWhenNotBlocked() { when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(0, 10)); IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), equalTo("Index shard " + indexShard.shardId() + " is not blocking all operations during closing")); + verify(indexShard, times(0)).flush(any(FlushRequest.class)); } public void testOperationFailsWithNoBlock() { @@ -167,17 +180,20 @@ public void testOperationFailsWithNoBlock() { IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing")); + verify(indexShard, times(0)).flush(any(FlushRequest.class)); } public void testVerifyShardBeforeIndexClosing() throws Throwable { executeOnPrimaryOrReplica(); - verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString()); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); + verify(indexShard, times(1)).flush(any(FlushRequest.class)); } - public void testVerifyShardBeforeIndexClosingFailed() throws Exception { - doThrow(new IllegalStateException("test")).when(indexShard).prepareShardBeforeIndexClosing(anyString()); + public void testVerifyShardBeforeIndexClosingFailed() { + doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing(); expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); - verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString()); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); + verify(indexShard, times(0)).flush(any(FlushRequest.class)); } public void testUnavailableShardsMarkedAsStale() throws Exception { @@ -211,7 +227,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { final PlainActionFuture listener = new PlainActionFuture<>(); TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L); TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, UUIDs.randomBase64UUID(), taskId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId); ReplicationOperation.Replicas proxy = action.newReplicasProxy(); ReplicationOperation operation = new ReplicationOperation<>( diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 2de1554e60d13..b689400601dc6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -20,7 +20,6 @@ import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.mapper.ParsedDocument; @@ -184,10 +183,10 @@ public void testReadOnly() throws IOException { } /** - * Test that {@link ReadOnlyEngine#prepareEngineBeforeIndexClosing(String)} never fails + * Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails * whatever the value of the global checkpoint to check is. */ - public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException { + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { @@ -196,7 +195,7 @@ public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException { try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { globalCheckpoint.set(randomNonNegativeLong()); try { - readOnlyEngine.prepareEngineBeforeIndexClosing(UUIDs.randomBase64UUID()); + readOnlyEngine.verifyEngineBeforeIndexClosing(); } catch (final IllegalStateException e) { fail("Read-only engine pre-closing verifications failed"); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 385b4e6fde9d5..af98ba990b253 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -21,9 +21,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -35,12 +33,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.indices.IndexClosedException; -import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; -import org.hamcrest.Matchers; import java.util.ArrayList; import java.util.List; @@ -54,9 +49,7 @@ import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_ACCURATE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -74,14 +67,6 @@ public Settings indexSettings() { return super.indexSettings(); } - @Override - protected void beforeIndexDeletion() throws Exception { - super.beforeIndexDeletion(); - internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); - internalCluster().assertSeqNos(); - internalCluster().assertSameDocIdsOnShards(); - } - public void testCloseMissingIndex() { IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareClose("test").get()); assertThat(e.getMessage(), is("no such index [test]")); @@ -353,67 +338,6 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertIndexIsClosed(indexName); } - public void testSyncedFlushBeforeClosing() throws Exception { - final String indexName = "synced_flush_before_closing"; - int numberOfReplicas = between(0, 2); - internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2)); - createIndex(indexName, Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put("index.routing.rebalance.enable", "none") - .build()); - indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) - .mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("num", i)).collect(toList())); - ensureGreen(indexName); - assertAcked(client().admin().indices().prepareClose(indexName).get()); - assertIndexIsClosed(indexName); - assertNoFileBasedRecovery(indexName); - String syncId = null; - for (ShardStats shardStats : getShardStats(indexName)) { - CommitStats commitStats = shardStats.getCommitStats(); - if (syncId == null) { - syncId = commitStats.syncId(); - } - assertThat(shardStats.getShardRouting() + " commit [" + commitStats + "]", - commitStats.syncId(), allOf(notNullValue(), equalTo(syncId))); - } - // Open a closed index should execute syncId recovery - assertAcked(client().admin().indices().prepareOpen(indexName).get()); - assertIndexIsOpened(indexName); - ensureGreen(indexName); - assertNoFileBasedRecovery(indexName); - assertSyncIdExists(indexName, syncId); - internalCluster().assertSameDocIdsOnShards(); - - // Close should not overwrite syncId if there's no write activity since the last close - assertAcked(client().admin().indices().prepareClose(indexName).get()); - assertIndexIsClosed(indexName); - ensureGreen(indexName); - assertNoFileBasedRecovery(indexName); - assertSyncIdExists(indexName, syncId); - internalCluster().assertSameDocIdsOnShards(); - - // Open for indexing - assertAcked(client().admin().indices().prepareOpen(indexName).get()); - assertIndexIsOpened(indexName); - indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) - .mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("num", i)).collect(toList())); - // Close should overwrite syncId as there's indexing activity since the last close - assertAcked(client().admin().indices().prepareClose(indexName).get()); - assertIndexIsClosed(indexName); - assertNoFileBasedRecovery(indexName); - String newSyncId = null; - for (ShardStats shardStats : getShardStats(indexName)) { - CommitStats commitStats = shardStats.getCommitStats(); - if (newSyncId == null) { - newSyncId = commitStats.syncId(); - } - assertThat(shardStats.getShardRouting() + " commit [" + commitStats + "]", - commitStats.syncId(), allOf(notNullValue(), equalTo(newSyncId))); - } - assertThat(newSyncId, Matchers.not(equalTo(syncId))); - } - static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -459,24 +383,4 @@ static void assertException(final Throwable throwable, final String indexName) { fail("Unexpected exception: " + t); } } - - void assertSyncIdExists(String indexName, String expectedSyncId) { - for (ShardStats shardStats : getShardStats(indexName)) { - CommitStats commitStats = shardStats.getCommitStats(); - assertThat(shardStats.getShardRouting() + " commit: " + commitStats, commitStats.syncId(), equalTo(expectedSyncId)); - } - } - - void assertNoFileBasedRecovery(String indexName) { - for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { - if (recovery.getPrimary() == false) { - assertThat(recovery.getIndex().fileDetails(), empty()); - } - } - } - - ShardStats[] getShardStats(String indexName) { - return client().admin().indices().prepareStats(indexName) - .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED).get().getIndex(indexName).getShards(); - } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index c1cfbeb2e296b..bbb0689a8a7e6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -210,7 +210,7 @@ public long getNumberOfOptimizedIndexing() { } @Override - protected void verifyEngineBeforeIndexClosing() { + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is not verified when the following engine is closed, // allowing it to be closed even in the case where all operations have not been fetched and // processed from the leader and the operations history has gaps. This way the following diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 412ff7785ec3f..e3d997886334b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -644,10 +643,10 @@ public void testProcessOnceOnPrimary() throws Exception { } /** - * Test that {@link FollowingEngine#prepareEngineBeforeIndexClosing(String)} never fails + * Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails * whatever the value of the global checkpoint to check is. */ - public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException { + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); runIndexTest( seqNo, @@ -655,9 +654,7 @@ public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException { (followingEngine, index) -> { globalCheckpoint.set(randomNonNegativeLong()); try { - String syncId = UUIDs.randomBase64UUID(); - followingEngine.prepareEngineBeforeIndexClosing(syncId); - assertThat(followingEngine.commitStats().syncId(), equalTo(syncId)); + followingEngine.verifyEngineBeforeIndexClosing(); } catch (final IllegalStateException e) { fail("Following engine pre-closing verifications failed"); } From 042ceb4266804450109e0429f334785070959e3c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 29 Apr 2019 14:03:59 +0200 Subject: [PATCH 11/14] Noop peer recoveries on closed index --- .../index/engine/InternalEngine.java | 4 ++ .../index/engine/ReadOnlyEngine.java | 2 +- .../indices/state/CloseIndexIT.java | 37 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 654d31d22671a..5fef835cbb167 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2558,6 +2558,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); + // avoid scanning translog if not necessary + if (startingSeqNo > currentLocalCheckpoint) { + return true; + } final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 777aff88e9dbc..da4eed8a86d17 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -300,7 +300,7 @@ public int estimateNumberOfHistoryOperations(String source, MapperService mapper @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - return false; + return startingSeqNo > seqNoStats.getLocalCheckpoint(); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index af98ba990b253..9b86d49f044b8 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; @@ -50,6 +51,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -338,6 +340,33 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertIndexIsClosed(indexName); } + public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception { + final String indexName = "peer-recovery-test"; + int numberOfReplicas = between(1, 2); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2)); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put("index.routing.rebalance.enable", "none") + .build()); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("num", i)).collect(toList())); + ensureGreen(indexName); + + // Closing an index should execute noop peer recovery + assertAcked(client().admin().indices().prepareClose(indexName).get()); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + + // Open a closed index should execute noop recovery + assertAcked(client().admin().indices().prepareOpen(indexName).get()); + assertIndexIsOpened(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + internalCluster().assertSameDocIdsOnShards(); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -383,4 +412,12 @@ static void assertException(final Throwable throwable, final String indexName) { fail("Unexpected exception: " + t); } } + + void assertNoFileBasedRecovery(String indexName) { + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + } + } + } } From 35c527bb381847641b4087387a23c887e68fabfe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 29 Apr 2019 14:23:35 -0400 Subject: [PATCH 12/14] strengthen operation-based condition --- .../index/engine/ReadOnlyEngine.java | 3 +- .../indices/state/CloseIndexIT.java | 36 ++++++++++--------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index da4eed8a86d17..e8efa7bf8b182 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -300,7 +300,8 @@ public int estimateNumberOfHistoryOperations(String source, MapperService mapper @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - return startingSeqNo > seqNoStats.getLocalCheckpoint(); + // we can do operation-based recovery if we don't have to replay any operation. + return startingSeqNo > seqNoStats.getMaxSeqNo(); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 9b86d49f044b8..249afe2773e0e 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -349,22 +349,26 @@ public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) .put("index.routing.rebalance.enable", "none") .build()); - indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) - .mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("num", i)).collect(toList())); - ensureGreen(indexName); - - // Closing an index should execute noop peer recovery - assertAcked(client().admin().indices().prepareClose(indexName).get()); - assertIndexIsClosed(indexName); - ensureGreen(indexName); - assertNoFileBasedRecovery(indexName); - - // Open a closed index should execute noop recovery - assertAcked(client().admin().indices().prepareOpen(indexName).get()); - assertIndexIsOpened(indexName); - ensureGreen(indexName); - assertNoFileBasedRecovery(indexName); - internalCluster().assertSameDocIdsOnShards(); + int iterations = between(1, 3); + for (int iter = 0; iter < iterations; iter++) { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + // Closing an index should execute noop peer recovery + assertAcked(client().admin().indices().prepareClose(indexName).get()); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + internalCluster().assertSameDocIdsOnShards(); + + // Open a closed index should execute noop recovery + assertAcked(client().admin().indices().prepareOpen(indexName).get()); + assertIndexIsOpened(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + internalCluster().assertSameDocIdsOnShards(); + } } static void assertIndexIsClosed(final String... indices) { From b50d3f24aff861651e1325d087c849f29ffa6b92 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Apr 2019 14:15:24 -0400 Subject: [PATCH 13/14] add recover existing replica test --- .../indices/state/CloseIndexIT.java | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 249afe2773e0e..c1df783ba6fe7 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -22,26 +22,31 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.emptySet; @@ -55,6 +60,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class CloseIndexIT extends ESIntegTestCase { @@ -341,7 +347,7 @@ public void testCloseIndexWaitForActiveShards() throws Exception { } public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception { - final String indexName = "peer-recovery-test"; + final String indexName = "noop-peer-recovery-test"; int numberOfReplicas = between(1, 2); internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2)); createIndex(indexName, Settings.builder() @@ -371,6 +377,47 @@ public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception { } } + public void testRecoverExistingReplica() throws Exception { + final String indexName = "test-recover-existing-replica"; + internalCluster().ensureAtLeastNumDataNodes(2); + List dataNodes = randomSubsetOf(2, Sets.newHashSet( + clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet())); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } else { + client().admin().indices().prepareSyncedFlush(indexName).get(); + } + // index more documents while one shard copy is offline + internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + Client client = client(dataNodes.get(0)); + int moreDocs = randomIntBetween(1, 50); + for (int i = 0; i < moreDocs; i++) { + client.prepareIndex(indexName, "_doc").setSource("num", i).get(); + } + assertAcked(client.admin().indices().prepareClose(indexName)); + return super.onNodeStopped(nodeName); + } + }); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), not(empty())); + } + } + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { From c1d33244d13c228c5b959fe23c5bce95df1c262a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 3 May 2019 08:46:05 -0400 Subject: [PATCH 14/14] add test comment --- .../java/org/elasticsearch/indices/state/CloseIndexIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index c1df783ba6fe7..740034f12ecc5 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -377,6 +377,9 @@ public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception { } } + /** + * Ensures that if a replica of a closed index does not have the same content as the primary, then a file-based recovery will occur. + */ public void testRecoverExistingReplica() throws Exception { final String indexName = "test-recover-existing-replica"; internalCluster().ensureAtLeastNumDataNodes(2);