From ee3255d9b8b5e3e53261642fe2bfe13c1a6a7867 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 3 Feb 2018 09:53:11 -0500 Subject: [PATCH 01/12] Make primary-replica resync failures less lenient --- .../TransportResyncReplicationAction.java | 24 +++- .../TransportReplicationAction.java | 24 ++++ .../replication/TransportWriteAction.java | 31 +---- .../action/resync/ResyncReplicationIT.java | 131 ++++++++++++++++++ .../test/engine/MockInternalEngine.java | 21 ++- 5 files changed, 199 insertions(+), 32 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 32614c636f128..ae5044846c587 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.util.function.Consumer; import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction onPrimaryDemoted, Consumer onIgnoredFailure) { + shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1a57b6a5d9500..095c3d146b0bd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1172,6 +1172,30 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R // "alive" if it were to be marked as stale. onSuccess.run(); } + + protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess, + final Consumer onPrimaryDemoted, + final Consumer onIgnoredFailure) { + return new ShardStateAction.Listener() { + @Override + public void onSuccess() { + onSuccess.run(); + } + + @Override + public void onFailure(Exception shardFailedError) { + if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { + onPrimaryDemoted.accept(shardFailedError); + } else { + // these can occur if the node is shutting down and are okay + // any other exception here is not expected and merits investigation + assert shardFailedError instanceof TransportException || + shardFailedError instanceof NodeClosedException : shardFailedError; + onIgnoredFailure.accept(shardFailedError); + } + } + }; + } } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 28b8f0826cd91..2a3e8be7aa8bb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -384,41 +384,16 @@ class WriteActionReplicasProxy extends ReplicasProxy { @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - - logger.warn((org.apache.logging.log4j.util.Supplier) - () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); + logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } @Override public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); - } - - private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ShardStateAction.Listener() { - @Override - public void onSuccess() { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } } } diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java new file mode 100644 index 0000000000000..f5b70903d3c32 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.resync; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.engine.MockInternalEngine; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ResyncReplicationIT extends ESIntegTestCase { + + @Override + protected Collection> getMockPlugins() { + final Set> mockPlugins = new HashSet<>(super.getMockPlugins()); + mockPlugins.add(MockEngineFactoryPlugin.class); + return mockPlugins; + } + + /** + * This test asserts that replicas failed to execute resync operations will be failed but not marked as stale. + */ + public void testFailResyncFailedReplicasButNotMarkAsStale() throws Exception { + final int numberOfReplicas = between(2, 5); + internalCluster().startMasterOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); + assertAcked( + prepareCreate("test", Settings.builder().put(indexSettings()) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); + ensureYellow(); + final List replicaNodes = internalCluster().startDataOnlyNodes(numberOfReplicas); + ensureGreen(); + logger.info("--> Disable shard allocation"); + assertAcked( + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get()); + final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0); + logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync"); + int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + index("test", "doc", Integer.toString(i)); + } + getEngine(primaryNode, shardId).getLocalCheckpointTracker().generateSeqNo(); // Make gap + int moreDocs = scaledRandomIntBetween(1, 10); + for (int i = 0; i < moreDocs; i++) { + index("test", "doc", Integer.toString(numDocs + i)); + } + logger.info("--> Promote a new primary and fail one resync operation on one replica"); + final AtomicReference failedNode = new AtomicReference<>(); + for (String replicaNode : replicaNodes) { + getEngine(replicaNode, shardId).setPreIndexingInterceptor((op) -> { + if (failedNode.compareAndSet(null, replicaNode)) { + throw new EngineException(shardId, "Intercepted to fail an indexing operation on [{}]", replicaNode); + } + }); + } + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas - 1)); + assertThat(clusterState.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); + for (String replicaNode : replicaNodes) { + final IndexShard shard = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(shardId); + if (replicaNode.equals(failedNode.get()) == false) { + assertThat(shard.getLocalCheckpoint(), equalTo((long) numDocs + moreDocs)); + } else { + assertThat(shard, nullValue()); + } + } + }); + logger.info("--> Re-enable shard allocation; resync-failed shard will be recovered"); + assertAcked( + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get()); + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas)); + assertThat(clusterState.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); + assertThat(internalCluster().getInstance(IndicesService.class, failedNode.get()).getShardOrNull(shardId), notNullValue()); + for (String replicaNode : replicaNodes) { + final IndexShard shard = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(shardId); + assertThat(shard.getLocalCheckpoint(), equalTo((long) numDocs + moreDocs)); + } + }); + } + + MockInternalEngine getEngine(String node, ShardId shardId) { + final IndexShard indexShard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId); + return (MockInternalEngine) IndexShardTestCase.getEngine(indexShard); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index 92c7b4d9fc0d0..aa3620cee3bd6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -21,16 +21,17 @@ import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.search.SearcherManager; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import java.io.IOException; +import java.util.function.Consumer; -final class MockInternalEngine extends InternalEngine { +public final class MockInternalEngine extends InternalEngine { private MockEngineSupport support; private Class wrapperClass; + private volatile Consumer preIndexingInterceptor; MockInternalEngine(EngineConfig config, Class wrapper) throws EngineException { super(config); @@ -83,4 +84,20 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceM final Searcher engineSearcher = super.newSearcher(source, searcher, manager); return support().wrapSearcher(source, engineSearcher, searcher, manager); } + + /** + * Installs a preIndexing interceptor which is called before an operation gets executed. + */ + public void setPreIndexingInterceptor(Consumer preIndexingInterceptor) { + this.preIndexingInterceptor = preIndexingInterceptor; + } + + @Override + public IndexResult index(Index index) throws IOException { + final Consumer interceptor = this.preIndexingInterceptor; + if (interceptor != null) { + interceptor.accept(index); + } + return super.index(index); + } } From 2faba2fce557a1f1c19c18979aed7dd0313e69e5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Feb 2018 13:36:13 -0500 Subject: [PATCH 02/12] Do not ignore not-avail-shard exception in write requests --- .../resync/TransportResyncReplicationAction.java | 5 +++++ .../support/replication/ReplicationOperation.java | 11 +++++++---- .../replication/TransportReplicationAction.java | 5 +++++ .../support/replication/TransportWriteAction.java | 5 +++++ 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index ae5044846c587..729502a5f541a 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -203,5 +203,10 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } + + @Override + public boolean canIgnoreReplicaFailureException(Exception replicaException) { + return false; + } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 817be12a8e2b5..eac4e355098c0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -26,12 +26,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -41,7 +39,6 @@ import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -186,7 +183,7 @@ public void onFailure(Exception replicaException) { shard, replicaRequest), replicaException); - if (TransportActions.isShardNotAvailableException(replicaException)) { + if (replicasProxy.canIgnoreReplicaFailureException(replicaException)) { decPendingAndFinishIfNeeded(); } else { RestStatus restStatus = ExceptionsHelper.status(replicaException); @@ -353,6 +350,12 @@ public interface Replicas> { */ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener listener); + /** + * Checks if the given replica exception can be ignored by this replication request. + * Some exceptions can be lenient by non-write requests but must be strictly handled by write requests. + */ + boolean canIgnoreReplicaFailureException(Exception replicaException); + /** * Fail the specified shard if needed, removing it from the current set * of active shards. Whether a failure is needed is left up to the diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 095c3d146b0bd..04d03c16fe25c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1153,6 +1153,11 @@ public void performOn( sendReplicaRequest(replicaRequest, node, listener); } + @Override + public boolean canIgnoreReplicaFailureException(Exception replicaException) { + return TransportActions.isShardNotAvailableException(replicaException); + } + @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 2a3e8be7aa8bb..b4e10e58fa77b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -395,5 +395,10 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } + + @Override + public boolean canIgnoreReplicaFailureException(Exception replicaException) { + return false; + } } } From 7c38394ad45c8e4a9bff1765d49fa966e3e4a706 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Feb 2018 13:52:54 -0500 Subject: [PATCH 03/12] Ignore by default --- .../action/support/replication/ReplicationOperation.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index eac4e355098c0..8ca32970d6d56 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -354,7 +354,9 @@ public interface Replicas> { * Checks if the given replica exception can be ignored by this replication request. * Some exceptions can be lenient by non-write requests but must be strictly handled by write requests. */ - boolean canIgnoreReplicaFailureException(Exception replicaException); + default boolean canIgnoreReplicaFailureException(Exception replicaException) { + return false; + } /** * Fail the specified shard if needed, removing it from the current set From a4c1a07a4e8ec022698a1d725a0ee3f57ca2c13d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Feb 2018 21:02:17 -0500 Subject: [PATCH 04/12] Revert ` not-avail-shard exception in write requests` --- .../resync/TransportResyncReplicationAction.java | 5 ----- .../support/replication/ReplicationOperation.java | 13 ++++--------- .../replication/TransportReplicationAction.java | 5 ----- .../support/replication/TransportWriteAction.java | 5 ----- 4 files changed, 4 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 729502a5f541a..ae5044846c587 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -203,10 +203,5 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } - - @Override - public boolean canIgnoreReplicaFailureException(Exception replicaException) { - return false; - } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 8ca32970d6d56..817be12a8e2b5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -26,10 +26,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -39,6 +41,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -183,7 +186,7 @@ public void onFailure(Exception replicaException) { shard, replicaRequest), replicaException); - if (replicasProxy.canIgnoreReplicaFailureException(replicaException)) { + if (TransportActions.isShardNotAvailableException(replicaException)) { decPendingAndFinishIfNeeded(); } else { RestStatus restStatus = ExceptionsHelper.status(replicaException); @@ -350,14 +353,6 @@ public interface Replicas> { */ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener listener); - /** - * Checks if the given replica exception can be ignored by this replication request. - * Some exceptions can be lenient by non-write requests but must be strictly handled by write requests. - */ - default boolean canIgnoreReplicaFailureException(Exception replicaException) { - return false; - } - /** * Fail the specified shard if needed, removing it from the current set * of active shards. Whether a failure is needed is left up to the diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 04d03c16fe25c..095c3d146b0bd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1153,11 +1153,6 @@ public void performOn( sendReplicaRequest(replicaRequest, node, listener); } - @Override - public boolean canIgnoreReplicaFailureException(Exception replicaException) { - return TransportActions.isShardNotAvailableException(replicaException); - } - @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index b4e10e58fa77b..2a3e8be7aa8bb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -395,10 +395,5 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } - - @Override - public boolean canIgnoreReplicaFailureException(Exception replicaException) { - return false; - } } } From 6c94d2075363117aa4472ed4b19cfa87eb03c61f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Feb 2018 21:06:06 -0500 Subject: [PATCH 05/12] Do not ignore not-available-shard exception --- .../replication/ReplicationOperation.java | 21 +++++---------- .../ReplicationOperationTests.java | 26 +++++-------------- 2 files changed, 13 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 817be12a8e2b5..7c20965213e5e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -26,12 +26,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -41,7 +39,6 @@ import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -186,17 +183,13 @@ public void onFailure(Exception replicaException) { shard, replicaRequest), replicaException); - if (TransportActions.isShardNotAvailableException(replicaException)) { - decPendingAndFinishIfNeeded(); - } else { - RestStatus restStatus = ExceptionsHelper.status(replicaException); - shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( - shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); - String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); - } + RestStatus restStatus = ExceptionsHelper.status(replicaException); + shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( + shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + replicasProxy.failShardIfNeeded(shard, message, + replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } }); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 858cbcce19989..6328976963b9a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -29,15 +29,12 @@ import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.index.shard.IndexShardNotStartedException; -import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -54,7 +51,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; @@ -96,21 +92,11 @@ public void testReplication() throws Exception { final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); final Map expectedFailures = new HashMap<>(); - final Set expectedFailedShards = new HashSet<>(); for (ShardRouting replica : expectedReplicas) { if (randomBoolean()) { - Exception t; - boolean criticalFailure = randomBoolean(); - if (criticalFailure) { - t = new CorruptIndexException("simulated", (String) null); - } else { - t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); - } - logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName()); - expectedFailures.put(replica, t); - if (criticalFailure) { - expectedFailedShards.add(replica); - } + final Exception ex = new CorruptIndexException("simulated", (String) null); + logger.debug("--> simulating failure on {} with [{}]", replica, ex.getClass().getSimpleName()); + expectedFailures.put(replica, ex); } } @@ -125,12 +111,12 @@ public void testReplication() throws Exception { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); - assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards)); + assertThat(replicasProxy.failedReplicas, equalTo(expectedFailures.keySet())); assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds)); assertTrue("listener is not marked as done", listener.isDone()); ShardInfo shardInfo = listener.actionGet().getShardInfo(); - assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size())); - assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size())); + assertThat(shardInfo.getFailed(), equalTo(expectedFailures.size())); + assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailures.size())); assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size())); final List unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); From 1478d0b1b9db17d7b0a03aedc6b59068ce76c825 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 9 Feb 2018 17:00:54 -0500 Subject: [PATCH 06/12] Revert "Do not ignore not-available-shard exception" This reverts commit 6c94d2075363117aa4472ed4b19cfa87eb03c61f. --- .../replication/ReplicationOperation.java | 21 ++++++++++----- .../ReplicationOperationTests.java | 26 ++++++++++++++----- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 7c20965213e5e..817be12a8e2b5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -26,10 +26,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -39,6 +41,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -183,13 +186,17 @@ public void onFailure(Exception replicaException) { shard, replicaRequest), replicaException); - RestStatus restStatus = ExceptionsHelper.status(replicaException); - shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( - shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); - String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); + if (TransportActions.isShardNotAvailableException(replicaException)) { + decPendingAndFinishIfNeeded(); + } else { + RestStatus restStatus = ExceptionsHelper.status(replicaException); + shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( + shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + replicasProxy.failShardIfNeeded(shard, message, + replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); + } } }); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 6328976963b9a..858cbcce19989 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -29,12 +29,15 @@ import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -51,6 +54,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; @@ -92,11 +96,21 @@ public void testReplication() throws Exception { final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); final Map expectedFailures = new HashMap<>(); + final Set expectedFailedShards = new HashSet<>(); for (ShardRouting replica : expectedReplicas) { if (randomBoolean()) { - final Exception ex = new CorruptIndexException("simulated", (String) null); - logger.debug("--> simulating failure on {} with [{}]", replica, ex.getClass().getSimpleName()); - expectedFailures.put(replica, ex); + Exception t; + boolean criticalFailure = randomBoolean(); + if (criticalFailure) { + t = new CorruptIndexException("simulated", (String) null); + } else { + t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); + } + logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName()); + expectedFailures.put(replica, t); + if (criticalFailure) { + expectedFailedShards.add(replica); + } } } @@ -111,12 +125,12 @@ public void testReplication() throws Exception { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); - assertThat(replicasProxy.failedReplicas, equalTo(expectedFailures.keySet())); + assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards)); assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds)); assertTrue("listener is not marked as done", listener.isDone()); ShardInfo shardInfo = listener.actionGet().getShardInfo(); - assertThat(shardInfo.getFailed(), equalTo(expectedFailures.size())); - assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailures.size())); + assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size())); + assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size())); assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size())); final List unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); From c7d14717d7fe0b5974b32c5688fb5f488f9d5aea Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Feb 2018 20:40:14 -0500 Subject: [PATCH 07/12] Naming + comment --- .../resync/TransportResyncReplicationAction.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index ae5044846c587..969da07b3d72e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -85,8 +85,8 @@ protected ResyncReplicationResponse newResponseInstance() { @Override protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { - // We treat the resync as best-effort for now and don't mark unavailable and failed shard copies as stale. - return new RsyncActionReplicasProxy(primaryTerm); + // Ensure primary-replica resync mandatory but avoid marking shards as stale during cluster restart. + return new ResyncActionReplicasProxy(primaryTerm); } @Override @@ -188,12 +188,11 @@ public void handleException(TransportException exp) { /** * A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted. - * Replica shards which fail to execute these resync operation will be failed but won't be marked as stale - * (eg. keep in the in sync set). This is a best-effort to avoid marking shards as stale during cluster restart. + * Replica shards fail to execute resync operations will be failed but won't be marked as stale. */ - class RsyncActionReplicasProxy extends ReplicasProxy { + class ResyncActionReplicasProxy extends ReplicasProxy { - RsyncActionReplicasProxy(long primaryTerm) { + ResyncActionReplicasProxy(long primaryTerm) { super(primaryTerm); } From 1ac7cf53fbaf7e16150d9250e177d7745e1bdc78 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Feb 2018 20:43:47 -0500 Subject: [PATCH 08/12] Move test to PrimaryAllocIT --- .../action/resync/ResyncReplicationIT.java | 131 ------------------ .../cluster/routing/PrimaryAllocationIT.java | 74 ++++++++++ .../test/engine/MockInternalEngine.java | 21 +-- 3 files changed, 76 insertions(+), 150 deletions(-) delete mode 100644 server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java deleted file mode 100644 index f5b70903d3c32..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationIT.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.resync; - -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.MockEngineFactoryPlugin; -import org.elasticsearch.index.engine.EngineException; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardTestCase; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.engine.MockInternalEngine; - -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -public class ResyncReplicationIT extends ESIntegTestCase { - - @Override - protected Collection> getMockPlugins() { - final Set> mockPlugins = new HashSet<>(super.getMockPlugins()); - mockPlugins.add(MockEngineFactoryPlugin.class); - return mockPlugins; - } - - /** - * This test asserts that replicas failed to execute resync operations will be failed but not marked as stale. - */ - public void testFailResyncFailedReplicasButNotMarkAsStale() throws Exception { - final int numberOfReplicas = between(2, 5); - internalCluster().startMasterOnlyNode(); - final String primaryNode = internalCluster().startDataOnlyNode(); - assertAcked( - prepareCreate("test", Settings.builder().put(indexSettings()) - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); - ensureYellow(); - final List replicaNodes = internalCluster().startDataOnlyNodes(numberOfReplicas); - ensureGreen(); - logger.info("--> Disable shard allocation"); - assertAcked( - client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get()); - final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0); - logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync"); - int numDocs = scaledRandomIntBetween(10, 100); - for (int i = 0; i < numDocs; i++) { - index("test", "doc", Integer.toString(i)); - } - getEngine(primaryNode, shardId).getLocalCheckpointTracker().generateSeqNo(); // Make gap - int moreDocs = scaledRandomIntBetween(1, 10); - for (int i = 0; i < moreDocs; i++) { - index("test", "doc", Integer.toString(numDocs + i)); - } - logger.info("--> Promote a new primary and fail one resync operation on one replica"); - final AtomicReference failedNode = new AtomicReference<>(); - for (String replicaNode : replicaNodes) { - getEngine(replicaNode, shardId).setPreIndexingInterceptor((op) -> { - if (failedNode.compareAndSet(null, replicaNode)) { - throw new EngineException(shardId, "Intercepted to fail an indexing operation on [{}]", replicaNode); - } - }); - } - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - assertThat(clusterState.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas - 1)); - assertThat(clusterState.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); - for (String replicaNode : replicaNodes) { - final IndexShard shard = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(shardId); - if (replicaNode.equals(failedNode.get()) == false) { - assertThat(shard.getLocalCheckpoint(), equalTo((long) numDocs + moreDocs)); - } else { - assertThat(shard, nullValue()); - } - } - }); - logger.info("--> Re-enable shard allocation; resync-failed shard will be recovered"); - assertAcked( - client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get()); - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - assertThat(clusterState.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas)); - assertThat(clusterState.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); - assertThat(internalCluster().getInstance(IndicesService.class, failedNode.get()).getShardOrNull(shardId), notNullValue()); - for (String replicaNode : replicaNodes) { - final IndexShard shard = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(shardId); - assertThat(shard.getLocalCheckpoint(), equalTo((long) numDocs + moreDocs)); - } - }); - } - - MockInternalEngine getEngine(String node, ShardId shardId) { - final IndexShard indexShard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId); - return (MockInternalEngine) IndexShardTestCase.getEngine(indexShard); - } -} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 86dd2dfe18904..f7b9c4cf99893 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -31,6 +32,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -43,15 +48,21 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class PrimaryAllocationIT extends ESIntegTestCase { @@ -309,4 +320,67 @@ public void testForceAllocatePrimaryOnNoDecision() throws Exception { assertEquals(1, client().admin().cluster().prepareState().get().getState() .routingTable().index(indexName).shardsWithState(ShardRoutingState.STARTED).size()); } + + /** + * This test asserts that replicas failed to execute resync operations will be failed but not marked as stale. + */ + public void testFailResyncFailedReplicasButNotMarkAsStale() throws Exception { + String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); + final int numberOfReplicas = between(2, 3); + assertAcked( + prepareCreate("test", Settings.builder().put(indexSettings()) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); + final String oldPrimary = internalCluster().startDataOnlyNode(); + final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0); + ensureYellow(); + final Set replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas)); + ensureGreen(); + assertAcked( + client(master).admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get()); + logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync"); + long numDocs = scaledRandomIntBetween(5, 50); + for (int i = 0; i < numDocs; i++) { + IndexResponse indexResult = index("test", "doc", Long.toString(i)); + assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); + } + final IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId); + IndexShardTestCase.getEngine(primaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno. + long moreDocs = scaledRandomIntBetween(1, 10); + for (int i = 0; i < moreDocs; i++) { + IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i)); + assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); + } + final Set isolatedReplicas = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes)); + final Set connectedReplicas = Sets.difference(replicaNodes, isolatedReplicas); + final Set connectedNodes = Sets.newHashSet(master, oldPrimary); + connectedNodes.addAll(connectedReplicas); + NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(connectedNodes, isolatedReplicas), new NetworkDisconnect()); + internalCluster().setDisruptionScheme(partition); + logger.info("--> isolating some replicas from cluster during primary-replica resync"); + partition.startDisrupting(); + ensureStableCluster(connectedNodes.size(), master); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(oldPrimary)); + assertBusy(() -> { + ClusterState state = client(master).admin().cluster().prepareState().get().getState(); + assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(connectedReplicas.size())); + assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); + }, 1, TimeUnit.MINUTES); + assertAcked( + client(master).admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get()); + partition.stopDisrupting(); + logger.info("--> stop disrupting network and re-enable allocation"); + assertBusy(() -> { + ClusterState state = client(master).admin().cluster().prepareState().get().getState(); + assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas)); + assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); + for (String node : replicaNodes) { + IndexShard shard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId); + assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs)); + } + }); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index aa3620cee3bd6..92c7b4d9fc0d0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -21,17 +21,16 @@ import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.SearcherManager; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import java.io.IOException; -import java.util.function.Consumer; -public final class MockInternalEngine extends InternalEngine { +final class MockInternalEngine extends InternalEngine { private MockEngineSupport support; private Class wrapperClass; - private volatile Consumer preIndexingInterceptor; MockInternalEngine(EngineConfig config, Class wrapper) throws EngineException { super(config); @@ -84,20 +83,4 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceM final Searcher engineSearcher = super.newSearcher(source, searcher, manager); return support().wrapSearcher(source, engineSearcher, searcher, manager); } - - /** - * Installs a preIndexing interceptor which is called before an operation gets executed. - */ - public void setPreIndexingInterceptor(Consumer preIndexingInterceptor) { - this.preIndexingInterceptor = preIndexingInterceptor; - } - - @Override - public IndexResult index(Index index) throws IOException { - final Consumer interceptor = this.preIndexingInterceptor; - if (interceptor != null) { - interceptor.accept(index); - } - return super.index(index); - } } From 1dcd44595e73667fa51584265b655b9399f640df Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 20 Feb 2018 09:48:40 -0500 Subject: [PATCH 09/12] more comment --- .../action/resync/TransportResyncReplicationAction.java | 2 +- .../org/elasticsearch/cluster/routing/PrimaryAllocationIT.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 969da07b3d72e..4e7c66afdcaf0 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -85,7 +85,6 @@ protected ResyncReplicationResponse newResponseInstance() { @Override protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { - // Ensure primary-replica resync mandatory but avoid marking shards as stale during cluster restart. return new ResyncActionReplicasProxy(primaryTerm); } @@ -189,6 +188,7 @@ public void handleException(TransportException exp) { /** * A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted. * Replica shards fail to execute resync operations will be failed but won't be marked as stale. + * This avoids marking shards as stale during cluster restart but enforces primary-replica resync mandatory. */ class ResyncActionReplicasProxy extends ReplicasProxy { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index f7b9c4cf99893..12b878ecbf626 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -324,7 +324,7 @@ public void testForceAllocatePrimaryOnNoDecision() throws Exception { /** * This test asserts that replicas failed to execute resync operations will be failed but not marked as stale. */ - public void testFailResyncFailedReplicasButNotMarkAsStale() throws Exception { + public void testPrimaryReplicaResyncFailed() throws Exception { String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); final int numberOfReplicas = between(2, 3); assertAcked( @@ -362,6 +362,7 @@ public void testFailResyncFailedReplicasButNotMarkAsStale() throws Exception { partition.startDisrupting(); ensureStableCluster(connectedNodes.size(), master); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(oldPrimary)); + // Fails replicas but not mark them as stale. assertBusy(() -> { ClusterState state = client(master).admin().cluster().prepareState().get().getState(); assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(connectedReplicas.size())); From 2f97ff8c2371d138af0ee1937ab4076635b2387f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Feb 2018 09:00:43 -0500 Subject: [PATCH 10/12] simplify test --- .../cluster/routing/PrimaryAllocationIT.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 12b878ecbf626..8b2b2375607e2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -60,6 +60,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -333,7 +334,6 @@ public void testPrimaryReplicaResyncFailed() throws Exception { .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); final String oldPrimary = internalCluster().startDataOnlyNode(); final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0); - ensureYellow(); final Set replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas)); ensureGreen(); assertAcked( @@ -352,22 +352,20 @@ public void testPrimaryReplicaResyncFailed() throws Exception { IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i)); assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); } - final Set isolatedReplicas = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes)); - final Set connectedReplicas = Sets.difference(replicaNodes, isolatedReplicas); - final Set connectedNodes = Sets.newHashSet(master, oldPrimary); - connectedNodes.addAll(connectedReplicas); - NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(connectedNodes, isolatedReplicas), new NetworkDisconnect()); + final Set replicasSide1 = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes)); + final Set replicasSide2 = Sets.difference(replicaNodes, replicasSide1); + NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide2, replicasSide1), new NetworkDisconnect()); internalCluster().setDisruptionScheme(partition); - logger.info("--> isolating some replicas from cluster during primary-replica resync"); + logger.info("--> isolating some replicas during primary-replica resync"); partition.startDisrupting(); - ensureStableCluster(connectedNodes.size(), master); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(oldPrimary)); - // Fails replicas but not mark them as stale. + // Checks that we fails replicas in one side but not mark them as stale. assertBusy(() -> { ClusterState state = client(master).admin().cluster().prepareState().get().getState(); - assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(connectedReplicas.size())); + assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), + anyOf(hasSize(replicasSide1.size()), hasSize(replicasSide2.size()))); assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); - }, 1, TimeUnit.MINUTES); + }, 2, TimeUnit.MINUTES); assertAcked( client(master).admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get()); From aadeda61df5953f674f382a5618dd187f970ebb8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Feb 2018 11:59:55 -0500 Subject: [PATCH 11/12] Check active shards in one side --- .../cluster/routing/PrimaryAllocationIT.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 8b2b2375607e2..b7d801103f017 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -60,10 +60,11 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isIn; +import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class PrimaryAllocationIT extends ESIntegTestCase { @@ -345,8 +346,8 @@ public void testPrimaryReplicaResyncFailed() throws Exception { IndexResponse indexResult = index("test", "doc", Long.toString(i)); assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); } - final IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId); - IndexShardTestCase.getEngine(primaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno. + final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId); + IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno. long moreDocs = scaledRandomIntBetween(1, 10); for (int i = 0; i < moreDocs; i++) { IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i)); @@ -354,7 +355,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception { } final Set replicasSide1 = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes)); final Set replicasSide2 = Sets.difference(replicaNodes, replicasSide1); - NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide2, replicasSide1), new NetworkDisconnect()); + NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide1, replicasSide2), new NetworkDisconnect()); internalCluster().setDisruptionScheme(partition); logger.info("--> isolating some replicas during primary-replica resync"); partition.startDisrupting(); @@ -362,10 +363,16 @@ public void testPrimaryReplicaResyncFailed() throws Exception { // Checks that we fails replicas in one side but not mark them as stale. assertBusy(() -> { ClusterState state = client(master).admin().cluster().prepareState().get().getState(); - assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), - anyOf(hasSize(replicasSide1.size()), hasSize(replicasSide2.size()))); + final IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId); + final String newPrimaryNode = state.getRoutingNodes().node(shardRoutingTable.primary.currentNodeId()).node().getName(); + assertThat(newPrimaryNode, not(equalTo(oldPrimary))); + Set selectedPartition = replicasSide1.contains(newPrimaryNode) ? replicasSide1 : replicasSide2; + assertThat(shardRoutingTable.activeShards(), hasSize(selectedPartition.size())); + for (ShardRouting activeShard : shardRoutingTable.activeShards()) { + assertThat(state.getRoutingNodes().node(activeShard.currentNodeId()).node().getName(), isIn(selectedPartition)); + } assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); - }, 2, TimeUnit.MINUTES); + }, 1, TimeUnit.MINUTES); assertAcked( client(master).admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get()); From 0329f5b3f85b590913aa42d3431f3834a32f8774 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 9 Mar 2018 09:09:42 -0500 Subject: [PATCH 12/12] start the primary node first --- .../org/elasticsearch/cluster/routing/PrimaryAllocationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index b7d801103f017..d7a91c988e9da 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -329,11 +329,11 @@ public void testForceAllocatePrimaryOnNoDecision() throws Exception { public void testPrimaryReplicaResyncFailed() throws Exception { String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); final int numberOfReplicas = between(2, 3); + final String oldPrimary = internalCluster().startDataOnlyNode(); assertAcked( prepareCreate("test", Settings.builder().put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); - final String oldPrimary = internalCluster().startDataOnlyNode(); final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0); final Set replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas)); ensureGreen();