Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,9 @@ public void handleResponse(Response response) {
public void handleException(TransportException exp) {
try {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
(isPrimaryAction && retryPrimaryException(exp.unwrapCause()))) {
final Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
(isPrimaryAction && retryPrimaryException(cause))) {
logger.trace("received an error from node [{}] for request [{}], scheduling a retry", exp, node.id(), request);
retry(exp);
} else {
Expand Down Expand Up @@ -799,6 +800,12 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (indexShard.routingEntry().primary() == false) {
throw new RetryOnPrimaryException(indexShard.shardId(), "actual shard is not a primary " + indexShard.routingEntry());
}
return IndexShardReferenceImpl.createOnPrimary(indexShard);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,6 @@ private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardSta

private void verifyPrimary() {
if (shardRouting.primary() == false) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("shard is not a primary " + shardRouting);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -158,15 +160,15 @@ public void testBlocks() throws ExecutionException, InterruptedException {
ReplicationTask task = maybeTask();

ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
assertPhase(task, "failed");

block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
listener = new PlainActionFuture<>();
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
Expand All @@ -181,7 +183,7 @@ public void testBlocks() throws ExecutionException, InterruptedException {
assertPhase(task, "waiting_for_retry");

block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, ClusterBlockException.class);
assertIndexShardUninitialized();
Expand All @@ -196,7 +198,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
final ShardId shardId = new ShardId(index, "_na_", 0);
// no replicas in oder to skip the replication part
setState(clusterService, state(index, true,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
ReplicationTask task = maybeTask();

logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Expand All @@ -221,7 +223,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
final List<CapturingTransport.CapturedRequest> capturedRequests =
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
Expand All @@ -234,7 +236,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
* before the relocation target, there is a time span where relocation source believes active primary to be on
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
* requests being sent back and forth.
*
* <p>
* This test checks that replication request is not routed back from relocation target to relocation source in case of
* stale index routing table on relocation target.
*/
Expand Down Expand Up @@ -271,7 +273,7 @@ public void testNoRerouteOnStaleClusterState() throws InterruptedException, Exec
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
final List<CapturingTransport.CapturedRequest> capturedRequests =
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
Expand All @@ -282,7 +284,7 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
final String index = "test";
// no replicas in oder to skip the replication part
setState(clusterService, state(index, true,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand All @@ -299,6 +301,61 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
}

public void testStalePrimaryShardOnReroute() throws InterruptedException {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// no replicas in order to skip the replication part
setState(clusterService, stateWithActivePrimary(index, true, randomInt(3)));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId);
boolean timeout = randomBoolean();
if (timeout) {
request.timeout("0s");
} else {
request.timeout("1h");
}
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();

TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
assertPhase(task, "waiting_on_primary");
transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));


if (timeout) {
// we always try at least one more time on timeout
assertThat(listener.isDone(), equalTo(false));
capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
assertPhase(task, "waiting_on_primary");
transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));
assertListenerThrows("must throw index not found exception", listener, ElasticsearchException.class);
assertPhase(task, "failed");
} else {
assertThat(listener.isDone(), equalTo(false));
// generate a CS change
setState(clusterService, clusterService.state());
capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
}
}

private ElasticsearchException randomRetryPrimaryException(ShardId shardId) {
return randomFrom(
new ShardNotFoundException(shardId),
new IndexNotFoundException(shardId.getIndex()),
new IndexShardClosedException(shardId),
new EngineClosedException(shardId),
new TransportReplicationAction.RetryOnPrimaryException(shardId, "hello")
);
}

public void testRoutePhaseExecutesRequest() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
Expand Down Expand Up @@ -449,7 +506,7 @@ protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Re
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase(
task, request, createTransportChannel(listener));
task, request, createTransportChannel(listener));
primaryPhase.run();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
ShardRouting relocatingReplicaShard = stateWithRelocatingReplica.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0);
Expand Down Expand Up @@ -485,7 +542,7 @@ protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Re
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase(
task, request, createTransportChannel(listener));
task, request, createTransportChannel(listener));
primaryPhase.run();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat("replication phase should be skipped if index gets deleted after primary operation", transport.capturedRequestsByTargetNode().size(), equalTo(0));
Expand Down Expand Up @@ -529,8 +586,8 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti

setState(clusterService, state(index, true, ShardRoutingState.STARTED, replicaStates));
logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]. expecting op to [{}]. using state: \n{}",
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
clusterService.state().prettyPrint());
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
clusterService.state().prettyPrint());

final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand Down Expand Up @@ -646,7 +703,7 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard

TransportChannel channel = createTransportChannel(listener, error::set);
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);

assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
Expand All @@ -656,7 +713,7 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard

HashMap<String, Request> nodesSentTo = new HashMap<>();
boolean executeOnReplica =
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
// no duplicate requests
Request replicationRequest = (Request) capturedRequest.request;
Expand Down Expand Up @@ -819,7 +876,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
final ShardId shardId = new ShardId(index, "_na_", 0);
// one replica to make sure replication is attempted
setState(clusterService, state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
indexShardRouting.set(primaryShard);
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Expand Down Expand Up @@ -856,7 +913,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
public void testReplicasCounter() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
setState(clusterService, state(shardId.getIndexName(), true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final ReplicationTask task = maybeTask();
Expand Down Expand Up @@ -895,7 +952,7 @@ public void testCounterDecrementedIfShardOperationThrowsException() throws Inter
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
setState(clusterService, state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand All @@ -915,7 +972,7 @@ public void testReroutePhaseRetriedAfterDemotedPrimary() {
final ShardId shardId = new ShardId(index, "_na_", 0);
boolean localPrimary = true;
setState(clusterService, state(index, localPrimary,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) {
Expand Down Expand Up @@ -967,7 +1024,7 @@ protected void resolveRequest(MetaData metaData, String concreteIndex, Request r
// publish a new cluster state
boolean localPrimaryOnRetry = randomBoolean();
setState(clusterService, state(index, localPrimaryOnRetry,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear();

// the request should be retried
Expand Down Expand Up @@ -1083,8 +1140,8 @@ class Action extends TransportReplicationAction<Request, Request, Response> {
ClusterService clusterService,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, null, threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME);
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME);
}

@Override
Expand Down