diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 32614c636f128..4e7c66afdcaf0 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.util.function.Consumer; import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction onPrimaryDemoted, Consumer onIgnoredFailure) { + shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 4398c56f26c77..2cd5f7a5f13ac 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1172,6 +1172,30 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R // "alive" if it were to be marked as stale. onSuccess.run(); } + + protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess, + final Consumer onPrimaryDemoted, + final Consumer onIgnoredFailure) { + return new ShardStateAction.Listener() { + @Override + public void onSuccess() { + onSuccess.run(); + } + + @Override + public void onFailure(Exception shardFailedError) { + if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { + onPrimaryDemoted.accept(shardFailedError); + } else { + // these can occur if the node is shutting down and are okay + // any other exception here is not expected and merits investigation + assert shardFailedError instanceof TransportException || + shardFailedError instanceof NodeClosedException : shardFailedError; + onIgnoredFailure.accept(shardFailedError); + } + } + }; + } } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 28b8f0826cd91..2a3e8be7aa8bb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -384,41 +384,16 @@ class WriteActionReplicasProxy extends ReplicasProxy { @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - - logger.warn((org.apache.logging.log4j.util.Supplier) - () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); + logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } @Override public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); - } - - private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ShardStateAction.Listener() { - @Override - public void onSuccess() { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 86dd2dfe18904..d7a91c988e9da 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -31,6 +32,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -43,15 +48,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isIn; +import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class PrimaryAllocationIT extends ESIntegTestCase { @@ -309,4 +322,71 @@ public void testForceAllocatePrimaryOnNoDecision() throws Exception { assertEquals(1, client().admin().cluster().prepareState().get().getState() .routingTable().index(indexName).shardsWithState(ShardRoutingState.STARTED).size()); } + + /** + * This test asserts that replicas failed to execute resync operations will be failed but not marked as stale. + */ + public void testPrimaryReplicaResyncFailed() throws Exception { + String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); + final int numberOfReplicas = between(2, 3); + final String oldPrimary = internalCluster().startDataOnlyNode(); + assertAcked( + prepareCreate("test", Settings.builder().put(indexSettings()) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); + final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0); + final Set replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas)); + ensureGreen(); + assertAcked( + client(master).admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get()); + logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync"); + long numDocs = scaledRandomIntBetween(5, 50); + for (int i = 0; i < numDocs; i++) { + IndexResponse indexResult = index("test", "doc", Long.toString(i)); + assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); + } + final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId); + IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno. + long moreDocs = scaledRandomIntBetween(1, 10); + for (int i = 0; i < moreDocs; i++) { + IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i)); + assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); + } + final Set replicasSide1 = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes)); + final Set replicasSide2 = Sets.difference(replicaNodes, replicasSide1); + NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide1, replicasSide2), new NetworkDisconnect()); + internalCluster().setDisruptionScheme(partition); + logger.info("--> isolating some replicas during primary-replica resync"); + partition.startDisrupting(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(oldPrimary)); + // Checks that we fails replicas in one side but not mark them as stale. + assertBusy(() -> { + ClusterState state = client(master).admin().cluster().prepareState().get().getState(); + final IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId); + final String newPrimaryNode = state.getRoutingNodes().node(shardRoutingTable.primary.currentNodeId()).node().getName(); + assertThat(newPrimaryNode, not(equalTo(oldPrimary))); + Set selectedPartition = replicasSide1.contains(newPrimaryNode) ? replicasSide1 : replicasSide2; + assertThat(shardRoutingTable.activeShards(), hasSize(selectedPartition.size())); + for (ShardRouting activeShard : shardRoutingTable.activeShards()) { + assertThat(state.getRoutingNodes().node(activeShard.currentNodeId()).node().getName(), isIn(selectedPartition)); + } + assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); + }, 1, TimeUnit.MINUTES); + assertAcked( + client(master).admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get()); + partition.stopDisrupting(); + logger.info("--> stop disrupting network and re-enable allocation"); + assertBusy(() -> { + ClusterState state = client(master).admin().cluster().prepareState().get().getState(); + assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas)); + assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); + for (String node : replicaNodes) { + IndexShard shard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId); + assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs)); + } + }); + } + }