diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index e3a03b00fa73d..8f867b486f969 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -58,6 +59,8 @@ public class ReplicationRequest> extends ChildTa private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private volatile boolean canHaveDuplicates = false; + private long routedBasedOnClusterVersion = 0; + public ReplicationRequest() { } @@ -170,6 +173,20 @@ public final T consistencyLevel(WriteConsistencyLevel consistencyLevel) { return (T) this; } + /** + * Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary. + * Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()} + */ + @SuppressWarnings("unchecked") + T routedBasedOnClusterVersion(long routedBasedOnClusterVersion) { + this.routedBasedOnClusterVersion = routedBasedOnClusterVersion; + return (T) this; + } + + long routedBasedOnClusterVersion() { + return routedBasedOnClusterVersion; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -192,6 +209,9 @@ public void readFrom(StreamInput in) throws IOException { index = in.readString(); canHaveDuplicates = in.readBoolean(); // no need to serialize threaded* parameters, since they only matter locally + if (in.getVersion().onOrAfter(Version.V_2_4_0)) { + routedBasedOnClusterVersion = in.readVLong(); + } } @Override @@ -207,6 +227,9 @@ public void writeTo(StreamOutput out) throws IOException { timeout.writeTo(out); out.writeString(index); out.writeBoolean(canHaveDuplicates); + if (out.getVersion().onOrAfter(Version.V_2_4_0)) { + out.writeVLong(routedBasedOnClusterVersion); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 80d33c663af6f..e6288f5175d23 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -479,6 +479,15 @@ protected void doRun() { } performAction(node, transportPrimaryAction, true); } else { + if (state.version() < request.routedBasedOnClusterVersion()) { + logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion()); + retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]"); + return; + } else { + // chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version + // this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already. + request.routedBasedOnClusterVersion(state.version()); + } if (logger.isTraceEnabled()) { logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId()); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0b04ae0a62892..bee2e79e3c7a8 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; @@ -56,6 +57,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; @@ -72,6 +74,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -217,6 +220,59 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept assertIndexShardCounter(1); } + /** + * When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from + * the relocation source to the relocation target. If relocation source receives and processes this cluster state + * before the relocation target, there is a time span where relocation source believes active primary to be on + * relocation target and relocation target believes active primary to be on relocation source. This results in replication + * requests being sent back and forth. + * + * This test checks that replication request is not routed back from relocation target to relocation source in case of + * stale index routing table on relocation target. + */ + @Test + public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + ClusterState state = state(index, true, ShardRoutingState.RELOCATING); + IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id()); + String relocationTargetNode = shardRoutingTable.primaryShard().relocatingNodeId(); + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(relocationTargetNode)).build(); + clusterService.setState(state); + logger.debug("--> relocation ongoing state:\n{}", clusterService.state().prettyPrint()); + + Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); + reroutePhase.run(); + assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class); + + request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1); + listener = new PlainActionFuture<>(); + reroutePhase = action.new ReroutePhase(null, request, listener); + reroutePhase.run(); + assertFalse("cluster state too old didn't cause a retry", listener.isDone()); + + // finish relocation + shardRoutingTable = clusterService.state().getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id()); + ShardRouting relocationTarget = shardRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).get(0); + AllocationService allocationService = ESAllocationTestCase.createAllocationService(); + RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget)); + ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build(); + + clusterService.setState(updatedState); + logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint()); + + shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId(); + final List capturedRequests = + transport.capturedRequestsByTargetNode().get(primaryNodeId); + assertThat(capturedRequests, notNullValue()); + assertThat(capturedRequests.size(), equalTo(1)); + assertThat(capturedRequests.get(0).action, equalTo("testAction[p]")); + assertIndexShardCounter(1); + } + @Test public void testUnknownIndexOrShardOnReroute() throws InterruptedException { final String index = "test";