From 703ff2b5abeb487ef685df941b418a68053d68c0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 4 Jan 2016 07:09:10 -0500 Subject: [PATCH 01/13] Wait for new master when failing shard This commit handles the situation when we are failing a shard and either no master is known, or the known master left while failing the shard. We handle this situation by waiting for a new master to be reelected, and then sending the shard failed request to the new master. --- .../TransportReplicationAction.java | 89 +++++++++++++++++-- 1 file changed, 81 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0014404057f50..aef455d14b0dc 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -31,6 +31,8 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.MasterNodeChangePredicate; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -42,6 +44,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; @@ -76,6 +79,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -882,14 +886,27 @@ public void handleException(TransportException exp) { if (ignoreReplicaException(exp)) { onReplicaFailure(nodeId, exp); } else { - logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node); - shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp)); + ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); + String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); + ReplicationFailedShardStateListener listener = new ReplicationFailedShardStateListener(observer, shard, exp, message, nodeId); + shardFailed(observer.observedState(), shard, exp, message, listener); } } } ); } + private void shardFailed(ClusterState clusterState, ShardRouting shard, TransportException exp, String message, ShardStateAction.Listener listener) { + logger.warn("{} {}", exp, shardId, message); + shardStateAction.shardFailed( + clusterState, + shard, + indexUUID, + message, + exp, + shardFailedTimeout, + listener); + } void onReplicaFailure(String nodeId, @Nullable Throwable e) { // Only version conflict should be ignored from being put into the _shards header? @@ -957,30 +974,86 @@ private void doFinish() { } public class ReplicationFailedShardStateListener implements ShardStateAction.Listener { + private final ClusterStateObserver observer; + private final ShardRouting shard; + private final TransportException exp; + private final String message; private final String nodeId; - private Throwable failure; - public ReplicationFailedShardStateListener(String nodeId, Throwable failure) { + public ReplicationFailedShardStateListener( + ClusterStateObserver observer, ShardRouting shard, TransportException exp, + String message, + String nodeId) { + this.observer = observer; + this.shard = shard; + this.exp = exp; + this.message = message; this.nodeId = nodeId; - this.failure = failure; } @Override public void onSuccess() { - onReplicaFailure(nodeId, failure); + // TODO: validate the cluster state and retry? + onReplicaFailure(nodeId, exp); } @Override public void onShardFailedNoMaster() { - onReplicaFailure(nodeId, failure); + waitForNewMasterAndRetry(); } @Override public void onShardFailedFailure(DiscoveryNode master, TransportException e) { if (e instanceof ReceiveTimeoutTransportException) { logger.trace("timeout sending shard failure to master [{}]", e, master); + // TODO: recheck the cluster state and retry indefinitely? + onReplicaFailure(nodeId, exp); + } else if (e.getCause() instanceof NotMasterException) { + waitForNewMasterAndRetry(); + } + } + + private void waitForNewMasterAndRetry() { + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + retry(state); + } + + @Override + public void onClusterServiceClose() { + logger.error("{} node closed while handling failed shard [{}]", exp, shard.shardId(), shard); + forceFinishAsFailed(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + // we wait indefinitely for a new master + assert false; + } + }, MasterNodeChangePredicate.INSTANCE); + } + + private void retry(ClusterState clusterState) { + if (!isFailed(shard, clusterState)) { + shardFailed(clusterState, shard, exp, message, this); + } else { + // the shard has already been failed, so just signal replica failure + onReplicaFailure(nodeId, exp); + } + } + + private boolean isFailed(ShardRouting shardRouting, ClusterState clusterState) { + // verify that the shard we requested to fail is no longer in the cluster state + RoutingNode routingNode = clusterState.getRoutingNodes().node(shardRouting.currentNodeId()); + if (routingNode == null) { + // the node left + return true; + } else { + // the same shard is gone + ShardRouting sr = routingNode.get(shardRouting.getId()); + return sr == null || !sr.isSameAllocation(shardRouting); } - onReplicaFailure(nodeId, failure); } } } From 58c2a3b6879d404807ce0e7b595c2226138dddbb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Jan 2016 09:58:53 -0500 Subject: [PATCH 02/13] Add simulation of master leaving in TransportReplicationActionTests This commit adds a simulation of the master leaving after a shard failure request has been sent. In this case, after a new cluster state is published (simulating a new master having been elected), the request to fail the shard should be retried. --- .../TransportReplicationActionTests.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0a390ea37062d..c8bf95a6e0a3a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -557,7 +558,24 @@ action.new ReplicationPhase(request, // the shard the request was sent to and the shard to be failed should be the same assertEquals(shardRoutingEntry.getShardRouting(), routing); failures.add(shardFailedRequest); - transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + if (randomBoolean()) { + // simulate master left and test that the shard failure is retried + int numberOfRetries = randomIntBetween(1, 4); + CapturingTransport.CapturedRequest currentRequest = shardFailedRequest; + for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) { + // force a new cluster state to simulate a new master having been elected + clusterService.setState(ClusterState.builder(clusterService.state())); + transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test")); + CapturingTransport.CapturedRequest[] retryRequests = transport.capturedRequests(); + transport.clear(); + assertEquals(1, retryRequests.length); + currentRequest = retryRequests[0]; + } + // now simulate that the last retry succeeded + transport.handleResponse(currentRequest.requestId, TransportResponse.Empty.INSTANCE); + } else { + transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + } } } else { successful++; From f49435c78b8a9574d54997492e5305840b610c57 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 8 Jan 2016 16:39:14 -0500 Subject: [PATCH 03/13] Centrally handle channel failures when failing a shard This commit moves the handling of channel failures when failing a shard to o.e.c.a.s.ShardStateAction. This means that shard failure requests that timeout or occur when there is no master or the master leaves after the request is sent will now be retried from here. The listener for a shard failed request will now only be notified upon successful completion of the shard failed request, or when a catastrophic non-channel failure occurs. --- .../TransportReplicationAction.java | 123 ++----------- .../action/shard/ShardStateAction.java | 98 +++++++--- .../cluster/IndicesClusterStateService.java | 6 +- .../action/shard/ShardStateActionTests.java | 174 +++++++++++++----- 4 files changed, 222 insertions(+), 179 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index aef455d14b0dc..b261851abdbcb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -31,8 +31,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.MasterNodeChangePredicate; -import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,7 +42,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; @@ -67,7 +64,6 @@ import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; @@ -886,28 +882,33 @@ public void handleException(TransportException exp) { if (ignoreReplicaException(exp)) { onReplicaFailure(nodeId, exp); } else { - ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); - ReplicationFailedShardStateListener listener = new ReplicationFailedShardStateListener(observer, shard, exp, message, nodeId); - shardFailed(observer.observedState(), shard, exp, message, listener); + logger.warn("{} {}", exp, shardId, message); + shardStateAction.shardFailed( + shard, + indexUUID, + message, + exp, + shardFailedTimeout, + new ShardStateAction.Listener() { + @Override + public void onSuccess() { + onReplicaFailure(nodeId, exp); + } + + @Override + public void onShardFailedFailure(Exception e) { + // TODO: handle catastrophic non-channel failures + onReplicaFailure(nodeId, exp); + } + } + ); } } } ); } - private void shardFailed(ClusterState clusterState, ShardRouting shard, TransportException exp, String message, ShardStateAction.Listener listener) { - logger.warn("{} {}", exp, shardId, message); - shardStateAction.shardFailed( - clusterState, - shard, - indexUUID, - message, - exp, - shardFailedTimeout, - listener); - } - void onReplicaFailure(String nodeId, @Nullable Throwable e) { // Only version conflict should be ignored from being put into the _shards header? if (e != null && ignoreReplicaException(e) == false) { @@ -972,90 +973,6 @@ private void doFinish() { } } } - - public class ReplicationFailedShardStateListener implements ShardStateAction.Listener { - private final ClusterStateObserver observer; - private final ShardRouting shard; - private final TransportException exp; - private final String message; - private final String nodeId; - - public ReplicationFailedShardStateListener( - ClusterStateObserver observer, ShardRouting shard, TransportException exp, - String message, - String nodeId) { - this.observer = observer; - this.shard = shard; - this.exp = exp; - this.message = message; - this.nodeId = nodeId; - } - - @Override - public void onSuccess() { - // TODO: validate the cluster state and retry? - onReplicaFailure(nodeId, exp); - } - - @Override - public void onShardFailedNoMaster() { - waitForNewMasterAndRetry(); - } - - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - if (e instanceof ReceiveTimeoutTransportException) { - logger.trace("timeout sending shard failure to master [{}]", e, master); - // TODO: recheck the cluster state and retry indefinitely? - onReplicaFailure(nodeId, exp); - } else if (e.getCause() instanceof NotMasterException) { - waitForNewMasterAndRetry(); - } - } - - private void waitForNewMasterAndRetry() { - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - retry(state); - } - - @Override - public void onClusterServiceClose() { - logger.error("{} node closed while handling failed shard [{}]", exp, shard.shardId(), shard); - forceFinishAsFailed(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - // we wait indefinitely for a new master - assert false; - } - }, MasterNodeChangePredicate.INSTANCE); - } - - private void retry(ClusterState clusterState) { - if (!isFailed(shard, clusterState)) { - shardFailed(clusterState, shard, exp, message, this); - } else { - // the shard has already been failed, so just signal replica failure - onReplicaFailure(nodeId, exp); - } - } - - private boolean isFailed(ShardRouting shardRouting, ClusterState clusterState) { - // verify that the shard we requested to fail is no longer in the cluster state - RoutingNode routingNode = clusterState.getRoutingNodes().node(shardRouting.currentNodeId()); - if (routingNode == null) { - // the node left - return true; - } else { - // the same shard is gone - ShardRouting sr = routingNode.get(shardRouting.getId()); - return sr == null || !sr.isSameAllocation(shardRouting); - } - } - } } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 00a238504f232..3a37767c04f88 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -22,9 +22,11 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -42,8 +44,10 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -64,51 +68,94 @@ public class ShardStateAction extends AbstractComponent { public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure"; private final TransportService transportService; + private final ClusterService clusterService; @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService) { super(settings); this.transportService = transportService; + this.clusterService = clusterService; transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); } - public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { - shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener); + public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { + shardFailed(shardRouting, indexUUID, message, failure, null, listener); } - public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { + public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message); - shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener); + shardFailed(shardRouting, indexUUID, message, failure, listener); } - public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) { - DiscoveryNode masterNode = clusterState.nodes().masterNode(); - if (masterNode == null) { - logger.warn("{} no master known to fail shard [{}]", shardRouting.shardId(), shardRouting); - listener.onShardFailedNoMaster(); - return; - } + public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) { + ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure); TransportRequestOptions options = TransportRequestOptions.EMPTY; if (timeout != null) { options = TransportRequestOptions.builder().withTimeout(timeout).build(); } - transportService.sendRequest(masterNode, - SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty response) { - listener.onSuccess(); - } + sendShardFailed(observer, shardRoutingEntry, options, listener); + } - @Override - public void handleException(TransportException exp) { - logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.shardRouting.shardId(), masterNode, shardRoutingEntry); - listener.onShardFailedFailure(masterNode, exp); - } - }); + private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) { + DiscoveryNode masterNode = observer.observedState().nodes().masterNode(); + if (masterNode == null) { + logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting()); + waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener); + } else { + transportService.sendRequest(masterNode, + SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty response) { + listener.onSuccess(); + } + + @Override + public void handleException(TransportException exp) { + if (exp instanceof ReceiveTimeoutTransportException) { + logger.trace("{} timeout sending shard failure [{}] to master [{}]", exp, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.failure, masterNode); + handleTimeout(shardRoutingEntry, observer, options, listener); + } else if (exp.getCause() instanceof NotMasterException) { + waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener); + } else { + logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry); + listener.onShardFailedFailure(exp); + } + } + }); + } + } + + // visible for testing + protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) { + // set the observed state to the latest cluster state + observer.reset(clusterService.state()); + sendShardFailed(observer, shardRoutingEntry, options, listener); + } + + // visible for testing + protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) { + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + sendShardFailed(observer, shardRoutingEntry, options, listener); + } + + @Override + public void onClusterServiceClose() { + logger.error("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting()); + listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + // we wait indefinitely for a new master + assert false; + } + }, MasterNodeChangePredicate.INSTANCE); } private static class ShardFailedTransportHandler implements TransportRequestHandler { @@ -334,10 +381,7 @@ public interface Listener { default void onSuccess() { } - default void onShardFailedNoMaster() { - } - - default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) { + default void onShardFailedFailure(final Exception e) { } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9357de7b1eb2f..6cb30789dda12 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -458,7 +458,7 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) { if (!indexService.hasShard(shardId) && shardRouting.started()) { if (failedShards.containsKey(shardRouting.shardId())) { if (nodes.masterNode() != null) { - shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(), + shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), "master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER); } } else { @@ -590,7 +590,7 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat if (!indexService.hasShard(shardId)) { if (failedShards.containsKey(shardRouting.shardId())) { if (nodes.masterNode() != null) { - shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(), + shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), "master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER); } return; @@ -788,7 +788,7 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m try { logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message); failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version())); - shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER); + shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER); } catch (Throwable e1) { logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index f69c9149a9c97..c603d94ec3451 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,20 +20,25 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; @@ -50,11 +55,57 @@ public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; - private ShardStateAction shardStateAction; + private AtomicBoolean timeout; + private TestShardStateAction shardStateAction; private CapturingTransport transport; private TransportService transportService; private TestClusterService clusterService; + private static class TestShardStateAction extends ShardStateAction { + public TestShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService) { + super(settings, clusterService, transportService, allocationService, routingService); + } + + private Runnable onBeforeTimeout; + + public void setOnBeforeTimeout(Runnable onBeforeTimeout) { + this.onBeforeTimeout = onBeforeTimeout; + } + + private Runnable onAfterTimeout; + + + public void setOnAfterTimeout(Runnable onAfterTimeout) { + this.onAfterTimeout = onAfterTimeout; + } + + @Override + protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) { + onBeforeTimeout.run(); + super.handleTimeout(shardRoutingEntry, observer, options, listener); + onAfterTimeout.run(); + } + + private Runnable onBeforeWaitForNewMasterAndRetry; + + public void setOnBeforeWaitForNewMasterAndRetry(Runnable onBeforeWaitForNewMasterAndRetry) { + this.onBeforeWaitForNewMasterAndRetry = onBeforeWaitForNewMasterAndRetry; + } + + private Runnable onAfterWaitForNewMasterAndRetry; + + public void setOnAfterWaitForNewMasterAndRetry(Runnable onAfterWaitForNewMasterAndRetry) { + this.onAfterWaitForNewMasterAndRetry = onAfterWaitForNewMasterAndRetry; + } + + @Override + protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) { + onBeforeWaitForNewMasterAndRetry.run(); + super.waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener); + onAfterWaitForNewMasterAndRetry.run(); + } + } + @BeforeClass public static void startThreadPool() { THREAD_POOL = new ThreadPool("ShardStateActionTest"); @@ -68,7 +119,12 @@ public void setUp() throws Exception { clusterService = new TestClusterService(THREAD_POOL); transportService = new TransportService(transport, THREAD_POOL); transportService.start(); - shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); + this.timeout = new AtomicBoolean(); + shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); + shardStateAction.setOnBeforeTimeout(() -> {}); + shardStateAction.setOnAfterTimeout(() -> {}); + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {}); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {}); } @Override @@ -84,94 +140,93 @@ public static void stopThreadPool() { THREAD_POOL = null; } - public void testNoMaster() { + public void testNoMaster() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes()); - builder.masterNodeId(null); - clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder)); + DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + noMasterBuilder.masterNodeId(null); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder)); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + CountDownLatch latch = new CountDownLatch(1); AtomicBoolean noMaster = new AtomicBoolean(); - assert !noMaster.get(); + AtomicBoolean retried = new AtomicBoolean(); + AtomicBoolean success = new AtomicBoolean(); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onShardFailedNoMaster() { - noMaster.set(true); - } + setUpMasterRetryVerification(noMaster, retried, latch); + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - + public void onSuccess() { + success.set(true); + latch.countDown(); } }); + latch.await(); + assertTrue(noMaster.get()); + assertTrue(retried.get()); + assertTrue(success.get()); } - public void testFailure() { + public void testMasterLeft() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean failure = new AtomicBoolean(); - assert !failure.get(); - - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onShardFailedNoMaster() { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean noMaster = new AtomicBoolean(); + AtomicBoolean retried = new AtomicBoolean(); + AtomicBoolean success = new AtomicBoolean(); - } + setUpMasterRetryVerification(noMaster, retried, latch); + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - failure.set(true); + public void onSuccess() { + success.set(true); + latch.countDown(); } }); - final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); assertThat(capturedRequests.length, equalTo(1)); - assert !failure.get(); - transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); + assertFalse(success.get()); + transport.handleResponse(capturedRequests[0].requestId, new NotMasterException("simulated")); - assertTrue(failure.get()); + latch.await(); + assertTrue(success.get()); } - public void testTimeout() throws InterruptedException { + public void testUnhandledFailure() { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean progress = new AtomicBoolean(); - AtomicBoolean timedOut = new AtomicBoolean(); + AtomicBoolean failure = new AtomicBoolean(); - TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS); - CountDownLatch latch = new CountDownLatch(1); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() { + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - if (e instanceof ReceiveTimeoutTransportException) { - assertFalse(progress.get()); - timedOut.set(true); - } - latch.countDown(); + public void onShardFailedFailure(Exception e) { + failure.set(true); } }); - latch.await(); - progress.set(true); - assertTrue(timedOut.get()); - final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); + assertFalse(failure.get()); + transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); + + assertTrue(failure.get()); } private ShardRouting getRandomShardRouting(String index) { @@ -182,6 +237,33 @@ private ShardRouting getRandomShardRouting(String index) { return shardRouting; } + private void setUpMasterRetryVerification(AtomicBoolean noMaster, AtomicBoolean retried, CountDownLatch latch) { + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { + DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); + }); + + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(noMaster, retried, latch)); + } + + private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDownLatch latch) { + invoked.set(true); + + // assert a retry request was sent + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + retried.set(capturedRequests.length == 1); + if (retried.get()) { + // finish the request + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + } else { + // there failed to be a retry request + // release the driver thread to fail the test + latch.countDown(); + } + } + private Throwable getSimulatedFailure() { return new CorruptIndexException("simulated", (String) null); } From f17f9a5f360a0266d0fcc9bf2ff445b28baaa3df Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Jan 2016 12:51:41 -0500 Subject: [PATCH 04/13] Remove timeout mechanism from ShardStateAction This commit removes the timeout retry mechanism from ShardStateAction allowing it to instead be handled by the general master channel retry mechanism. The idea is that if there is a network issue, the master will miss a ping timeout causing the channel to be closed which will expose itself via a NodeDisconnectedException. At this point, we can just wait for a new master and retry, as with any other master channel exception. --- .../TransportReplicationAction.java | 12 +---- .../action/shard/ShardStateAction.java | 46 ++++++++----------- .../TransportReplicationActionTests.java | 2 +- .../action/shard/ShardStateActionTests.java | 44 +++++++----------- 4 files changed, 37 insertions(+), 67 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b261851abdbcb..a025643599604 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -92,8 +92,6 @@ */ public abstract class TransportReplicationAction extends TransportAction { - public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout"; - protected final TransportService transportService; protected final ClusterService clusterService; protected final IndicesService indicesService; @@ -101,7 +99,6 @@ public abstract class TransportReplicationAction> MASTER_CHANNEL_EXCEPTIONS = new HashSet<>(Arrays.asList(NotMasterException.class, NodeDisconnectedException.class)); + private static boolean isMasterChannelException(Throwable cause) { + return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass()); } // visible for testing - protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) { + protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - sendShardFailed(observer, shardRoutingEntry, options, listener); + sendShardFailed(observer, shardRoutingEntry, listener); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index c8bf95a6e0a3a..fc0a2719300c6 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -489,7 +489,7 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as TransportReplicationAction.ReplicationPhase replicationPhase = action.new ReplicationPhase(request, new Response(), - request.shardId(), createTransportChannel(listener), reference, null); + request.shardId(), createTransportChannel(listener), reference); assertThat(replicationPhase.totalShards(), equalTo(totalShards)); assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index c603d94ec3451..8aca784934398 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -31,13 +31,12 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; @@ -45,6 +44,8 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,26 +67,6 @@ public TestShardStateAction(Settings settings, ClusterService clusterService, Tr super(settings, clusterService, transportService, allocationService, routingService); } - private Runnable onBeforeTimeout; - - public void setOnBeforeTimeout(Runnable onBeforeTimeout) { - this.onBeforeTimeout = onBeforeTimeout; - } - - private Runnable onAfterTimeout; - - - public void setOnAfterTimeout(Runnable onAfterTimeout) { - this.onAfterTimeout = onAfterTimeout; - } - - @Override - protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) { - onBeforeTimeout.run(); - super.handleTimeout(shardRoutingEntry, observer, options, listener); - onAfterTimeout.run(); - } - private Runnable onBeforeWaitForNewMasterAndRetry; public void setOnBeforeWaitForNewMasterAndRetry(Runnable onBeforeWaitForNewMasterAndRetry) { @@ -99,9 +80,9 @@ public void setOnAfterWaitForNewMasterAndRetry(Runnable onAfterWaitForNewMasterA } @Override - protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) { + protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { onBeforeWaitForNewMasterAndRetry.run(); - super.waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener); + super.waitForNewMasterAndRetry(observer, shardRoutingEntry, listener); onAfterWaitForNewMasterAndRetry.run(); } } @@ -121,8 +102,6 @@ public void setUp() throws Exception { transportService.start(); this.timeout = new AtomicBoolean(); shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); - shardStateAction.setOnBeforeTimeout(() -> {}); - shardStateAction.setOnAfterTimeout(() -> {}); shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {}); shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {}); } @@ -173,7 +152,7 @@ public void onSuccess() { assertTrue(success.get()); } - public void testMasterLeft() throws InterruptedException { + public void testMasterChannelException() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); @@ -193,13 +172,22 @@ public void onSuccess() { success.set(true); latch.countDown(); } + + @Override + public void onShardFailedFailure(Exception e) { + success.set(false); + latch.countDown(); + } }); final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); transport.clear(); assertThat(capturedRequests.length, equalTo(1)); assertFalse(success.get()); - transport.handleResponse(capturedRequests[0].requestId, new NotMasterException("simulated")); + List possibleExceptions = new ArrayList<>(); + possibleExceptions.add(new NotMasterException("simulated")); + possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); + transport.handleResponse(capturedRequests[0].requestId, randomFrom(possibleExceptions)); latch.await(); assertTrue(success.get()); From d55c5f6c4db2cf885022b73a948a9c2a52be882f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Jan 2016 14:35:16 -0500 Subject: [PATCH 05/13] Handle FailedToCommitClusterStateException in ShardStateAction This commit adds Discovery.FailedToCommitClusterStateException to the list of channel failures that ShardStateAction handles and retries. --- .../cluster/action/shard/ShardStateAction.java | 8 +++++++- .../cluster/action/shard/ShardStateActionTests.java | 6 ++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 71917e2e8c7d3..9a0d7e55ac376 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -121,7 +122,12 @@ public void handleException(TransportException exp) { } } - private static Set> MASTER_CHANNEL_EXCEPTIONS = new HashSet<>(Arrays.asList(NotMasterException.class, NodeDisconnectedException.class)); + private static Set> MASTER_CHANNEL_EXCEPTIONS = + new HashSet<>(Arrays.asList( + NotMasterException.class, + NodeDisconnectedException.class, + Discovery.FailedToCommitClusterStateException.class + )); private static boolean isMasterChannelException(Throwable cause) { return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass()); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 8aca784934398..ba445c939d380 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; @@ -49,6 +50,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; @@ -163,6 +165,7 @@ public void testMasterChannelException() throws InterruptedException { AtomicBoolean noMaster = new AtomicBoolean(); AtomicBoolean retried = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean(); + AtomicReference exception = new AtomicReference<>(); setUpMasterRetryVerification(noMaster, retried, latch); @@ -176,6 +179,7 @@ public void onSuccess() { @Override public void onShardFailedFailure(Exception e) { success.set(false); + exception.set(e); latch.countDown(); } }); @@ -187,9 +191,11 @@ public void onShardFailedFailure(Exception e) { List possibleExceptions = new ArrayList<>(); possibleExceptions.add(new NotMasterException("simulated")); possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); + possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated")); transport.handleResponse(capturedRequests[0].requestId, randomFrom(possibleExceptions)); latch.await(); + assertNull(exception.get()); assertTrue(success.get()); } From 5a5d7881ed4c7b91fa711362a8e3c2154365af7c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 12 Jan 2016 08:31:06 -0500 Subject: [PATCH 06/13] Remove dead field in o.e.c.a.s.ShardStateActionTests --- .../cluster/action/shard/ShardStateActionTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index ba445c939d380..db2e67eb30400 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -58,7 +58,6 @@ public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; - private AtomicBoolean timeout; private TestShardStateAction shardStateAction; private CapturingTransport transport; private TransportService transportService; @@ -102,7 +101,6 @@ public void setUp() throws Exception { clusterService = new TestClusterService(THREAD_POOL); transportService = new TransportService(transport, THREAD_POOL); transportService.start(); - this.timeout = new AtomicBoolean(); shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {}); shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {}); From 8f67dcc34852a0e6a5c1dc9dcf3d58bfe829a893 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 12 Jan 2016 08:34:38 -0500 Subject: [PATCH 07/13] Add Javadocs for exceptions that are handled by ShardStateAction --- .../cluster/action/shard/ShardStateAction.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 9a0d7e55ac376..b2b264c81f982 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -377,6 +377,21 @@ public interface Listener { default void onSuccess() { } + /** + * Notification for non-channel exceptions that are not handled + * by {@link ShardStateAction}. + * + * The exceptions that are handled by {@link ShardStateAction} + * are: + * - {@link NotMasterException} + * - {@link NodeDisconnectedException} + * - {@link Discovery.FailedToCommitClusterStateException} + * + * Any other exception is communicated to the requester via + * this notification. + * + * @param e the unexpected cause of the failure on the master + */ default void onShardFailedFailure(final Exception e) { } } From 7f78d52c72fedff82f1446999041709f87e2dcf1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 14 Jan 2016 16:51:36 -0500 Subject: [PATCH 08/13] Use capture and clear convenience method --- .../replication/TransportReplicationActionTests.java | 3 +-- .../cluster/action/shard/ShardStateActionTests.java | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index fc0a2719300c6..b6b752a6ae9d7 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -566,8 +566,7 @@ action.new ReplicationPhase(request, // force a new cluster state to simulate a new master having been elected clusterService.setState(ClusterState.builder(clusterService.state())); transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test")); - CapturingTransport.CapturedRequest[] retryRequests = transport.capturedRequests(); - transport.clear(); + CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, retryRequests.length); currentRequest = retryRequests[0]; } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index db2e67eb30400..e9db5e404c7d1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -182,8 +182,7 @@ public void onShardFailedFailure(Exception e) { } }); - final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); - transport.clear(); + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); assertFalse(success.get()); List possibleExceptions = new ArrayList<>(); @@ -243,8 +242,7 @@ private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDown invoked.set(true); // assert a retry request was sent - final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); - transport.clear(); + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); retried.set(capturedRequests.length == 1); if (retried.get()) { // finish the request From efb142613ffc38b30db81cf7b01e5c2d24f1baff Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 15 Jan 2016 09:25:42 -0500 Subject: [PATCH 09/13] Tighten shard state action tests This commit tightens the tests in o.e.c.a.s.ShardStateActionTests: - adds a simple test for a success condition that validates the shard failed request is correct and sent to the correct place - remove redundant assertions from the no master and master left tests - an assertion that success is not falsely indicated in the case of a unhandled error --- .../action/shard/ShardStateActionTests.java | 58 +++++++++++++++---- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index e9db5e404c7d1..6cbaa3d905552 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -54,6 +54,8 @@ import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.is; public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; @@ -119,6 +121,41 @@ public static void stopThreadPool() { THREAD_POOL = null; } + public void testSuccess() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + AtomicBoolean success = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); + + ShardRouting shardRouting = getRandomShardRouting(index); + shardStateAction.shardFailed(shardRouting, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + }); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertEquals(1, capturedRequests.length); + // the request is a shard failed request + assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardRoutingEntry.class))); + ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry)capturedRequests[0].request; + // for the right shard + assertEquals(shardRouting, shardRoutingEntry.getShardRouting()); + // sent to the master + assertEquals(clusterService.state().nodes().masterNode().getId(), capturedRequests[0].node.getId()); + + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + + latch.await(); + assertTrue(success.get()); + } + public void testNoMaster() throws InterruptedException { final String index = "test"; @@ -131,11 +168,10 @@ public void testNoMaster() throws InterruptedException { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean noMaster = new AtomicBoolean(); AtomicBoolean retried = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean(); - setUpMasterRetryVerification(noMaster, retried, latch); + setUpMasterRetryVerification(retried, latch); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override @@ -147,7 +183,6 @@ public void onSuccess() { latch.await(); - assertTrue(noMaster.get()); assertTrue(retried.get()); assertTrue(success.get()); } @@ -160,12 +195,11 @@ public void testMasterChannelException() throws InterruptedException { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean noMaster = new AtomicBoolean(); AtomicBoolean retried = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean(); AtomicReference exception = new AtomicReference<>(); - setUpMasterRetryVerification(noMaster, retried, latch); + setUpMasterRetryVerification(retried, latch); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override @@ -185,6 +219,7 @@ public void onShardFailedFailure(Exception e) { final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); assertFalse(success.get()); + assertFalse(retried.get()); List possibleExceptions = new ArrayList<>(); possibleExceptions.add(new NotMasterException("simulated")); possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); @@ -206,6 +241,11 @@ public void testUnhandledFailure() { AtomicBoolean failure = new AtomicBoolean(); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + assert false; + } + @Override public void onShardFailedFailure(Exception e) { failure.set(true); @@ -228,19 +268,17 @@ private ShardRouting getRandomShardRouting(String index) { return shardRouting; } - private void setUpMasterRetryVerification(AtomicBoolean noMaster, AtomicBoolean retried, CountDownLatch latch) { + private void setUpMasterRetryVerification(AtomicBoolean retried, CountDownLatch latch) { shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); }); - shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(noMaster, retried, latch)); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(retried, latch)); } - private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDownLatch latch) { - invoked.set(true); - + private void verifyRetry(AtomicBoolean retried, CountDownLatch latch) { // assert a retry request was sent final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); retried.set(capturedRequests.length == 1); From fe39d11c55c5fea0518bf0c38c87cd15343416e7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 15 Jan 2016 09:45:39 -0500 Subject: [PATCH 10/13] Logging in shard state action This commit adds a trace log on a cluster state update while waiting for a new master, and changes the log level on cluster service close to the warn level. --- .../elasticsearch/cluster/action/shard/ShardStateAction.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index b2b264c81f982..e76425a118828 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -137,12 +137,15 @@ protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRout observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { + if (logger.isTraceEnabled()) { + logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), state.prettyPrint(), shardRoutingEntry); + } sendShardFailed(observer, shardRoutingEntry, listener); } @Override public void onClusterServiceClose() { - logger.error("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting()); + logger.warn("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting()); listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode())); } From 7eefcbbeed4ca06aa8e5b4ff853dbb0d8d6cfde1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 15 Jan 2016 11:26:32 -0500 Subject: [PATCH 11/13] Add retry loop in shard state action tests This commit enhances the master channel exception test in o.e.c.a.s.ShardStateActionTests to test that a retries loop as expected when requests to the master repeatedly fail. --- .../action/shard/ShardStateActionTests.java | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 6cbaa3d905552..f8f263976f79e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -50,7 +50,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; @@ -168,10 +170,10 @@ public void testNoMaster() throws InterruptedException { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean retried = new AtomicBoolean(); + AtomicInteger retries = new AtomicInteger(); AtomicBoolean success = new AtomicBoolean(); - setUpMasterRetryVerification(retried, latch); + setUpMasterRetryVerification(1, retries, latch, requestId -> {}); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override @@ -183,7 +185,7 @@ public void onSuccess() { latch.await(); - assertTrue(retried.get()); + assertThat(retries.get(), equalTo(1)); assertTrue(success.get()); } @@ -195,11 +197,20 @@ public void testMasterChannelException() throws InterruptedException { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean retried = new AtomicBoolean(); + AtomicInteger retries = new AtomicInteger(); AtomicBoolean success = new AtomicBoolean(); AtomicReference exception = new AtomicReference<>(); - setUpMasterRetryVerification(retried, latch); + LongConsumer retryLoop = requestId -> { + List possibleExceptions = new ArrayList<>(); + possibleExceptions.add(new NotMasterException("simulated")); + possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); + possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated")); + transport.handleResponse(requestId, randomFrom(possibleExceptions)); + }; + + final int numberOfRetries = randomIntBetween(1, 256); + setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override @@ -219,15 +230,12 @@ public void onShardFailedFailure(Exception e) { final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); assertFalse(success.get()); - assertFalse(retried.get()); - List possibleExceptions = new ArrayList<>(); - possibleExceptions.add(new NotMasterException("simulated")); - possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); - possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated")); - transport.handleResponse(capturedRequests[0].requestId, randomFrom(possibleExceptions)); + assertThat(retries.get(), equalTo(0)); + retryLoop.accept(capturedRequests[0].requestId); latch.await(); assertNull(exception.get()); + assertThat(retries.get(), equalTo(numberOfRetries)); assertTrue(success.get()); } @@ -268,23 +276,27 @@ private ShardRouting getRandomShardRouting(String index) { return shardRouting; } - private void setUpMasterRetryVerification(AtomicBoolean retried, CountDownLatch latch) { + private void setUpMasterRetryVerification(int numberOfRetries, AtomicInteger retries, CountDownLatch latch, LongConsumer retryLoop) { shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); }); - shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(retried, latch)); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(numberOfRetries, retries, latch, retryLoop)); } - private void verifyRetry(AtomicBoolean retried, CountDownLatch latch) { + private void verifyRetry(int numberOfRetries, AtomicInteger retries, CountDownLatch latch, LongConsumer retryLoop) { // assert a retry request was sent final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - retried.set(capturedRequests.length == 1); - if (retried.get()) { - // finish the request - transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + if (capturedRequests.length == 1) { + retries.incrementAndGet(); + if (retries.get() == numberOfRetries) { + // finish the request + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + } else { + retryLoop.accept(capturedRequests[0].requestId); + } } else { // there failed to be a retry request // release the driver thread to fail the test From cf3c0ed049fe3337e19ed7068bfa13292c7eb5cc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 15 Jan 2016 16:48:08 -0500 Subject: [PATCH 12/13] Sanity assertion that exception cause is not null This commit adds a sanity assertion that the cause of a transport exception when sending a shard failure is not null. --- .../org/elasticsearch/cluster/action/shard/ShardStateAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index e76425a118828..a9c69007aeeef 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -111,6 +111,7 @@ public void handleResponse(TransportResponse.Empty response) { @Override public void handleException(TransportException exp) { + assert exp.getCause() != null : exp; if (isMasterChannelException(exp.getCause())) { waitForNewMasterAndRetry(observer, shardRoutingEntry, listener); } else { From 386d2ab86a33fa83cfa770f0d37542572136736f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 17 Jan 2016 10:36:24 -0500 Subject: [PATCH 13/13] More tightening of shard state action tests This commit adds some additional assertions that test success is not falsely indicated by adding assertions that success / failure methods are not incorrectly invoked in failure / success scenarios. --- .../action/shard/ShardStateActionTests.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index f8f263976f79e..8bc43114e7d44 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -140,6 +140,13 @@ public void onSuccess() { success.set(true); latch.countDown(); } + + @Override + public void onShardFailedFailure(Exception e) { + success.set(false); + latch.countDown(); + assert false; + } }); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); @@ -181,6 +188,13 @@ public void onSuccess() { success.set(true); latch.countDown(); } + + @Override + public void onShardFailedFailure(Exception e) { + success.set(false); + latch.countDown(); + assert false; + } }); latch.await(); @@ -224,6 +238,7 @@ public void onShardFailedFailure(Exception e) { success.set(false); exception.set(e); latch.countDown(); + assert false; } }); @@ -251,6 +266,7 @@ public void testUnhandledFailure() { shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { + failure.set(false); assert false; }