diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0014404057f50..a025643599604 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -64,7 +64,6 @@ import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; @@ -76,6 +75,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,8 +92,6 @@ */ public abstract class TransportReplicationAction extends TransportAction { - public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout"; - protected final TransportService transportService; protected final ClusterService clusterService; protected final IndicesService indicesService; @@ -101,7 +99,6 @@ public abstract class TransportReplicationAction> MASTER_CHANNEL_EXCEPTIONS = + new HashSet<>(Arrays.asList( + NotMasterException.class, + NodeDisconnectedException.class, + Discovery.FailedToCommitClusterStateException.class + )); + private static boolean isMasterChannelException(Throwable cause) { + return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass()); + } + + // visible for testing + protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + if (logger.isTraceEnabled()) { + logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), state.prettyPrint(), shardRoutingEntry); } - }); + sendShardFailed(observer, shardRoutingEntry, listener); + } + + @Override + public void onClusterServiceClose() { + logger.warn("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting()); + listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + // we wait indefinitely for a new master + assert false; + } + }, MasterNodeChangePredicate.INSTANCE); } private static class ShardFailedTransportHandler implements TransportRequestHandler { @@ -334,10 +381,22 @@ public interface Listener { default void onSuccess() { } - default void onShardFailedNoMaster() { - } - - default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) { + /** + * Notification for non-channel exceptions that are not handled + * by {@link ShardStateAction}. + * + * The exceptions that are handled by {@link ShardStateAction} + * are: + * - {@link NotMasterException} + * - {@link NodeDisconnectedException} + * - {@link Discovery.FailedToCommitClusterStateException} + * + * Any other exception is communicated to the requester via + * this notification. + * + * @param e the unexpected cause of the failure on the master + */ + default void onShardFailedFailure(final Exception e) { } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9357de7b1eb2f..6cb30789dda12 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -458,7 +458,7 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) { if (!indexService.hasShard(shardId) && shardRouting.started()) { if (failedShards.containsKey(shardRouting.shardId())) { if (nodes.masterNode() != null) { - shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(), + shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), "master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER); } } else { @@ -590,7 +590,7 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat if (!indexService.hasShard(shardId)) { if (failedShards.containsKey(shardRouting.shardId())) { if (nodes.masterNode() != null) { - shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(), + shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), "master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER); } return; @@ -788,7 +788,7 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m try { logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message); failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version())); - shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER); + shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER); } catch (Throwable e1) { logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0a390ea37062d..b6b752a6ae9d7 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -488,7 +489,7 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as TransportReplicationAction.ReplicationPhase replicationPhase = action.new ReplicationPhase(request, new Response(), - request.shardId(), createTransportChannel(listener), reference, null); + request.shardId(), createTransportChannel(listener), reference); assertThat(replicationPhase.totalShards(), equalTo(totalShards)); assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); @@ -557,7 +558,23 @@ action.new ReplicationPhase(request, // the shard the request was sent to and the shard to be failed should be the same assertEquals(shardRoutingEntry.getShardRouting(), routing); failures.add(shardFailedRequest); - transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + if (randomBoolean()) { + // simulate master left and test that the shard failure is retried + int numberOfRetries = randomIntBetween(1, 4); + CapturingTransport.CapturedRequest currentRequest = shardFailedRequest; + for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) { + // force a new cluster state to simulate a new master having been elected + clusterService.setState(ClusterState.builder(clusterService.state())); + transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test")); + CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear(); + assertEquals(1, retryRequests.length); + currentRequest = retryRequests[0]; + } + // now simulate that the last retry succeeded + transport.handleResponse(currentRequest.requestId, TransportResponse.Empty.INSTANCE); + } else { + transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + } } } else { successful++; diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index f69c9149a9c97..8bc43114e7d44 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,41 +20,78 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.is; public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; - private ShardStateAction shardStateAction; + private TestShardStateAction shardStateAction; private CapturingTransport transport; private TransportService transportService; private TestClusterService clusterService; + private static class TestShardStateAction extends ShardStateAction { + public TestShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService) { + super(settings, clusterService, transportService, allocationService, routingService); + } + + private Runnable onBeforeWaitForNewMasterAndRetry; + + public void setOnBeforeWaitForNewMasterAndRetry(Runnable onBeforeWaitForNewMasterAndRetry) { + this.onBeforeWaitForNewMasterAndRetry = onBeforeWaitForNewMasterAndRetry; + } + + private Runnable onAfterWaitForNewMasterAndRetry; + + public void setOnAfterWaitForNewMasterAndRetry(Runnable onAfterWaitForNewMasterAndRetry) { + this.onAfterWaitForNewMasterAndRetry = onAfterWaitForNewMasterAndRetry; + } + + @Override + protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { + onBeforeWaitForNewMasterAndRetry.run(); + super.waitForNewMasterAndRetry(observer, shardRoutingEntry, listener); + onAfterWaitForNewMasterAndRetry.run(); + } + } + @BeforeClass public static void startThreadPool() { THREAD_POOL = new ThreadPool("ShardStateActionTest"); @@ -68,7 +105,9 @@ public void setUp() throws Exception { clusterService = new TestClusterService(THREAD_POOL); transportService = new TransportService(transport, THREAD_POOL); transportService.start(); - shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); + shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {}); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {}); } @Override @@ -84,94 +123,165 @@ public static void stopThreadPool() { THREAD_POOL = null; } - public void testNoMaster() { + public void testSuccess() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes()); - builder.masterNodeId(null); - clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder)); - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean noMaster = new AtomicBoolean(); - assert !noMaster.get(); + AtomicBoolean success = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + ShardRouting shardRouting = getRandomShardRouting(index); + shardStateAction.shardFailed(shardRouting, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedNoMaster() { - noMaster.set(true); + public void onSuccess() { + success.set(true); + latch.countDown(); } @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - + public void onShardFailedFailure(Exception e) { + success.set(false); + latch.countDown(); + assert false; } }); - assertTrue(noMaster.get()); + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertEquals(1, capturedRequests.length); + // the request is a shard failed request + assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardRoutingEntry.class))); + ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry)capturedRequests[0].request; + // for the right shard + assertEquals(shardRouting, shardRoutingEntry.getShardRouting()); + // sent to the master + assertEquals(clusterService.state().nodes().masterNode().getId(), capturedRequests[0].node.getId()); + + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + + latch.await(); + assertTrue(success.get()); } - public void testFailure() { + public void testNoMaster() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + noMasterBuilder.masterNodeId(null); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder)); + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean failure = new AtomicBoolean(); - assert !failure.get(); + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger retries = new AtomicInteger(); + AtomicBoolean success = new AtomicBoolean(); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onShardFailedNoMaster() { + setUpMasterRetryVerification(1, retries, latch, requestId -> {}); + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); } @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - failure.set(true); + public void onShardFailedFailure(Exception e) { + success.set(false); + latch.countDown(); + assert false; } }); - final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests.length, equalTo(1)); - assert !failure.get(); - transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); + latch.await(); - assertTrue(failure.get()); + assertThat(retries.get(), equalTo(1)); + assertTrue(success.get()); } - public void testTimeout() throws InterruptedException { + public void testMasterChannelException() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean progress = new AtomicBoolean(); - AtomicBoolean timedOut = new AtomicBoolean(); - - TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS); CountDownLatch latch = new CountDownLatch(1); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() { + AtomicInteger retries = new AtomicInteger(); + AtomicBoolean success = new AtomicBoolean(); + AtomicReference exception = new AtomicReference<>(); + + LongConsumer retryLoop = requestId -> { + List possibleExceptions = new ArrayList<>(); + possibleExceptions.add(new NotMasterException("simulated")); + possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); + possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated")); + transport.handleResponse(requestId, randomFrom(possibleExceptions)); + }; + + final int numberOfRetries = randomIntBetween(1, 256); + setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop); + + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - if (e instanceof ReceiveTimeoutTransportException) { - assertFalse(progress.get()); - timedOut.set(true); - } + public void onShardFailedFailure(Exception e) { + success.set(false); + exception.set(e); latch.countDown(); + assert false; } }); + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + assertFalse(success.get()); + assertThat(retries.get(), equalTo(0)); + retryLoop.accept(capturedRequests[0].requestId); + latch.await(); - progress.set(true); - assertTrue(timedOut.get()); + assertNull(exception.get()); + assertThat(retries.get(), equalTo(numberOfRetries)); + assertTrue(success.get()); + } + + public void testUnhandledFailure() { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + AtomicBoolean failure = new AtomicBoolean(); + + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + failure.set(false); + assert false; + } + + @Override + public void onShardFailedFailure(Exception e) { + failure.set(true); + } + }); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); + assertFalse(failure.get()); + transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); + + assertTrue(failure.get()); } private ShardRouting getRandomShardRouting(String index) { @@ -182,6 +292,34 @@ private ShardRouting getRandomShardRouting(String index) { return shardRouting; } + private void setUpMasterRetryVerification(int numberOfRetries, AtomicInteger retries, CountDownLatch latch, LongConsumer retryLoop) { + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { + DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); + }); + + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(numberOfRetries, retries, latch, retryLoop)); + } + + private void verifyRetry(int numberOfRetries, AtomicInteger retries, CountDownLatch latch, LongConsumer retryLoop) { + // assert a retry request was sent + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + if (capturedRequests.length == 1) { + retries.incrementAndGet(); + if (retries.get() == numberOfRetries) { + // finish the request + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + } else { + retryLoop.accept(capturedRequests[0].requestId); + } + } else { + // there failed to be a retry request + // release the driver thread to fail the test + latch.countDown(); + } + } + private Throwable getSimulatedFailure() { return new CorruptIndexException("simulated", (String) null); }